singular value decomposition in Spark ML

2016-08-04 Thread Sandy Ryza
Hi,

Is SVD or PCA in Spark ML (i.e. spark.ml parity with the mllib
RowMatrix.computeSVD API) slated for any upcoming release?

Many thanks for any guidance!
-Sandy


Re: Content based window operation on Time-series data

2015-12-17 Thread Sandy Ryza
Hi Arun,

A Java API was actually recently added to the library.  It will be
available in the next release.

-Sandy

On Thu, Dec 10, 2015 at 12:16 AM, Arun Verma 
wrote:

> Thank you for your reply. It is a Scala and Python library. Is similar
> library exists for Java?
>
> On Wed, Dec 9, 2015 at 10:26 PM, Sean Owen  wrote:
>
>> CC Sandy as his https://github.com/cloudera/spark-timeseries might be
>> of use here.
>>
>> On Wed, Dec 9, 2015 at 4:54 PM, Arun Verma 
>> wrote:
>> > Hi all,
>> >
>> > We have RDD(main) of sorted time-series data. We want to split it into
>> > different RDDs according to window size and then perform some
>> aggregation
>> > operation like max, min etc. over each RDD in parallel.
>> >
>> > If window size is w then ith RDD has data from (startTime + (i-1)*w) to
>> > (startTime + i*w) where startTime is time-stamp of 1st entry in main
>> RDD and
>> > (startTime + (i-1)*w) is greater then last entry of main RDD.
>> >
>> > For now, I am using DataFrame and Spark version 1.5.2. Below code is
>> running
>> > sequentially on the data, so execution time is high and resource
>> utilization
>> > is low. Code snippet is given below:
>> > /*
>> > * aggragator is max
>> > * df - Dataframe has sorted timeseries data
>> > * start - first entry of DataFrame
>> > * end - last entry of DataFrame df
>> > * bucketLengthSec - window size
>> > * stepResults - has particular block/window output(JSON)
>> > * appendResults - has output till this block/window(JSON)
>> > */
>> > while (start <= end) {
>> > row = df.filter(df.col("timeStamp")
>> > .between(start, nextStart))
>> > .agg(max(df.col("timeStamp")), max(df.col("value")))
>> > .first();
>> > if (row.get(0) != null) {
>> > stepResults = new JSONObject();
>> > stepResults.put("x", Long.parseLong(row.get(0).toString()));
>> > stepResults.put("y", row.get(1));
>> > appendResults.add(stepResults);
>> > }
>> > start = nextStart;
>> > nextStart = start + bucketLengthSec;
>> > }
>> >
>> >
>> > --
>> > Thanks and Regards,
>> > Arun Verma
>>
>
>
>
> --
> Thanks and Regards,
> Arun Verma
>


Re: PySpark Lost Executors

2015-11-19 Thread Sandy Ryza
Hi Ross,

This is most likely occurring because YARN is killing containers for
exceeding physical memory limits.  You can make this less likely to happen
by bumping spark.yarn.executor.memoryOverhead to something higher than 10%
of your spark.executor.memory.

-Sandy

On Thu, Nov 19, 2015 at 8:14 AM,  wrote:

> Hmm I guess I do not - I get 'application_1445957755572_0176 does not
> have any log files.’ Where can I enable log aggregation?
>
> On Nov 19, 2015, at 11:07 AM, Ted Yu  wrote:
>
> Do you have YARN log aggregation enabled ?
>
> You can try retrieving log for the container using the following command:
>
> yarn logs -applicationId application_1445957755572_0176
>  -containerId container_1445957755572_0176_01_03
>
> Cheers
>
> On Thu, Nov 19, 2015 at 8:02 AM,  wrote:
>
>> I am running Spark 1.5.2 on Yarn. My job consists of a number of SparkSQL
>> transforms on a JSON data set that I load into a data frame. The data set
>> is not large (~100GB) and most stages execute without any issues. However,
>> some more complex stages tend to lose executors/nodes regularly. What would
>> cause this to happen? The logs don’t give too much information -
>>
>> 15/11/19 15:53:43 ERROR YarnScheduler: Lost executor 2 on
>> ip-10-0-0-136.ec2.internal: Yarn deallocated the executor 2 (container
>> container_1445957755572_0176_01_03)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 142.0 in stage 33.0 (TID
>> 8331, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 133.0 in stage 33.0 (TID
>> 8322, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 79.0 in stage 33.0 (TID
>> 8268, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 141.0 in stage 33.0 (TID
>> 8330, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 123.0 in stage 33.0 (TID
>> 8312, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 162.0 in stage 33.0 (TID
>> 8351, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 153.0 in stage 33.0 (TID
>> 8342, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 120.0 in stage 33.0 (TID
>> 8309, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 149.0 in stage 33.0 (TID
>> 8338, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> 15/11/19 15:53:43 WARN TaskSetManager: Lost task 134.0 in stage 33.0 (TID
>> 8323, ip-10-0-0-136.ec2.internal): ExecutorLostFailure (executor 2 lost)
>> [Stage 33:===> (117 + 50)
>> / 200]15/11/19 15:53:46 WARN ReliableDeliverySupervisor: Association with
>> remote system [akka.tcp://sparkExecutor@ip-10-0-0-136.ec2.internal:60275]
>> has failed, address is now gated for [5000] ms. Reason: [Disassociated]
>>
>>  - Followed by a list of lost tasks on each executor.
>
>
>
>


Re: SequenceFile and object reuse

2015-11-18 Thread Sandy Ryza
Hi Jeff,

Many access patterns simply take the result of hadoopFile and use it to
create some other object, and thus have no need for each input record to
refer to a different object.  In those cases, the current API is more
performant than an alternative that would create an object for each record,
because it avoids the unnecessary overhead of creating Java objects.  As
you've pointed out, this is at the expense of making the code more verbose
when caching.

-Sandy

On Fri, Nov 13, 2015 at 10:29 AM, jeff saremi 
wrote:

> So we tried reading a sequencefile in Spark and realized that all our
> records have ended up becoming the same.
> THen one of us found this:
>
> Note: Because Hadoop's RecordReader class re-uses the same Writable object
> for each record, directly caching the returned RDD or directly passing it
> to an aggregation or shuffle operation will create many references to the
> same object. If you plan to directly cache, sort, or aggregate Hadoop
> writable objects, you should first copy them using a map function.
>
> Is there anyone that can shed some light on this bizzare behavior and the
> decisions behind it?
> And I also would like to know if anyone's able to read a binary file and
> not to incur the additional map() as suggested by the above? What format
> did you use?
>
> thanks
> Jeff
>


Re: Is the resources specified in configuration shared by all jobs?

2015-11-04 Thread Sandy Ryza
Hi Nisrina,

The resources you specify are shared by all jobs that run inside the
application.

-Sandy

On Wed, Nov 4, 2015 at 9:24 AM, Nisrina Luthfiyati <
nisrina.luthfiy...@gmail.com> wrote:

> Hi all,
>
> I'm running some spark jobs in java on top of YARN by submitting one
> application jar that starts multiple jobs.
> My question is, if I'm setting some resource configurations, either when
> submitting the app or in spark-defaults.conf, would this configs apply to
> each job or the entire application?
>
> For example if I lauch it with:
>
> spark-submit --class org.some.className \
> --master yarn-client \
> --num-executors 3 \
> --executor-memory 5g \
> someJar.jar \
>
> , would the 3 executor x 5G memory be allocated to each job or would all
> jobs share the resources?
>
> Thank you!
> Nisrina
>
>


Re: Spark tunning increase number of active tasks

2015-10-31 Thread Sandy Ryza
Hi Xiaochuan,

The most likely cause of the "Lost container" issue is that YARN is killing
container for exceeding memory limits.  If this is the case, you should be
able to find instances of "exceeding memory limits" in the application
logs.

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
has a more detailed explanation of why this happens.

-Sandy

On Sat, Oct 31, 2015 at 4:29 AM, Jörn Franke  wrote:

> Maybe Hortonworks support can help you much better.
>
> Otherwise you may want to change the yarn scheduler configuration and
> preemption. Do you use something like speculative execution?
>
> How do you start execution of the programs? Maybe you are already using
> all cores of the master...
>
> On 30 Oct 2015, at 23:32, YI, XIAOCHUAN  wrote:
>
> Hi
>
> Our team has a 40 node hortonworks Hadoop cluster 2.2.4.2-2  (36 data
> node) with apache spark 1.2 and 1.4 installed.
>
> Each node has 64G RAM and 8 cores.
>
>
>
> We are only able to use <= 72 executors with executor-cores=2
>
> So we are only get 144 active tasks running pyspark programs with pyspark.
>
> [Stage 1:===>(596 + 144) /
> 2042]
>
> IF we use larger number for --num-executors, the pyspark program exit with
> errors:
>
> ERROR YarnScheduler: Lost executor 113 on hag017.example.com: remote Rpc
> client disassociated
>
>
>
> I tried spark 1.4 and conf.set("dynamicAllocation.enabled", "true").
> However it does not help us to increase the number of active tasks.
>
> I expect larger number of active tasks with the cluster we have.
>
> Could anyone advise on this? Thank you very much!
>
>
>
> Shaun
>
>
>
>


Re: Spark 1.5 on CDH 5.4.0

2015-10-22 Thread Sandy Ryza
Hi Deenar,

The version of Spark you have may not be compiled with YARN support.  If
you inspect the contents of the assembly jar, does
org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll need
to find a version that does have the YARN classes.  You can also build your
own using the -Pyarn flag.

-Sandy

On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar 
wrote:

> Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
> working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
> well including connecting to the Hive metastore. I am facing an issue
> running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
> start as java cannot find ExecutorLauncher. Error: Could not find or load
> main class org.apache.spark.deploy.yarn.ExecutorLauncher client token: 
> N/Adiagnostics:
> Application application_1443531450011_13437 failed 2 times due to AM
> Container for appattempt_1443531450011_13437_02 exited with exitCode: 
> 1Stack
> trace: ExitCodeException exitCode=1:at
> org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
> org.apache.hadoop.util.Shell.run(Shell.java:455)at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
> java.util.concurrent.FutureTask.run(FutureTask.java:262)at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
> java.lang.Thread.run(Thread.java:745) Any ideas as to what might be going
> wrong. Also how can I turn on more detailed logging to see what command
> line is being run by Yarn to launch containers? RegardsDeenar
>


Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-23 Thread Sandy Ryza
Hi Anfernee,

That's correct that each InputSplit will map to exactly a Spark partition.

On YARN, each Spark executor maps to a single YARN container.  Each
executor can run multiple tasks over its lifetime, both parallel and
sequentially.

If you enable dynamic allocation, after the stage including the InputSplits
gets submitted, Spark will try to request an appropriate number of
executors.

The memory in the YARN resource requests is --executor-memory + what's set
for spark.yarn.executor.memoryOverhead, which defaults to 10% of
--executor-memory.

-Sandy

On Wed, Sep 23, 2015 at 2:38 PM, Anfernee Xu  wrote:

> Hi Spark experts,
>
> I'm coming across these terminologies and having some confusions, could
> you please help me understand them better?
>
> For instance I have implemented a Hadoop InputFormat to load my external
> data in Spark, in turn my custom InputFormat will create a bunch of
> InputSplit's, my questions is about
>
> # Each InputSplit will exactly map to a Spark partition, is that correct?
>
> # If I run on Yarn, how does Spark executor/task map to Yarn container?
>
> # because I already have a bunch of InputSplits, do I still need to
> specify the number of executors to get processing parallelized?
>
> # How does -executor-memory map to the memory requirement in Yarn's
> resource request?
>
> --
> --Anfernee
>


Re: Spark on Yarn vs Standalone

2015-09-21 Thread Sandy Ryza
/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
>> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
>> application_1442869100946_0001 --user-class-path
>> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
>> 1>
>> /var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stdout
>> 2>
>> /var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stderr
>>
>>
>>
>> Is it possible to get what scratch space is used for?
>>
>> What spark setting should I try to adjust to solve the issue?
>>
>> On Thu, Sep 10, 2015 at 2:52 PM, Sandy Ryza <sandy.r...@cloudera.com>
>> wrote:
>>
>>> YARN will never kill processes for being unresponsive.
>>>
>>> It may kill processes for occupying more memory than it allows.  To get
>>> around this, you can either bump spark.yarn.executor.memoryOverhead or turn
>>> off the memory checks entirely with yarn.nodemanager.pmem-check-enabled.
>>>
>>> -Sandy
>>>
>>> On Tue, Sep 8, 2015 at 10:48 PM, Alexander Pivovarov <
>>> apivova...@gmail.com> wrote:
>>>
>>>> The problem which we have now is skew data (2360 tasks done in 5 min, 3
>>>> tasks in 40 min and 1 task in 2 hours)
>>>>
>>>> Some people from the team worry that the executor which runs the
>>>> longest task can be killed by YARN (because executor might be unresponsive
>>>> because of GC or it might occupy more memory than Yarn allows)
>>>>
>>>>
>>>>
>>>> On Tue, Sep 8, 2015 at 3:02 PM, Sandy Ryza <sandy.r...@cloudera.com>
>>>> wrote:
>>>>
>>>>> Those settings seem reasonable to me.
>>>>>
>>>>> Are you observing performance that's worse than you would expect?
>>>>>
>>>>> -Sandy
>>>>>
>>>>> On Mon, Sep 7, 2015 at 11:22 AM, Alexander Pivovarov <
>>>>> apivova...@gmail.com> wrote:
>>>>>
>>>>>> Hi Sandy
>>>>>>
>>>>>> Thank you for your reply
>>>>>> Currently we use r3.2xlarge boxes (vCPU: 8, Mem: 61 GiB)
>>>>>> with emr setting for Spark "maximizeResourceAllocation": "true"
>>>>>>
>>>>>> It is automatically converted to Spark settings
>>>>>> spark.executor.memory47924M
>>>>>> spark.yarn.executor.memoryOverhead 5324
>>>>>>
>>>>>> we also set spark.default.parallelism = slave_count * 16
>>>>>>
>>>>>> Does it look good for you? (we run single heavy job on cluster)
>>>>>>
>>>>>> Alex
>>>>>>
>>>>>> On Mon, Sep 7, 2015 at 11:03 AM, Sandy Ryza <sandy.r...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Alex,
>>>>>>>
>>>>>>> If they're both configured correctly, there's no reason that Spark
>>>>>>> Standalone should provide performance or memory improvement over Spark 
>>>>>>> on
>>>>>>> YARN.
>>>>>>>
>>>>>>> -Sandy
>>>>>>>
>>>>>>> On Fri, Sep 4, 2015 at 1:24 PM, Alexander Pivovarov <
>>>>>>> apivova...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Everyone
>>>>>>>>
>>>>>>>> We are trying the latest aws emr-4.0.0 and Spark and my question is
>>>>>>>> about YARN vs Standalone mode.
>>>>>>>> Our usecase is
>>>>>>>> - start 100-150 nodes cluster every week,
>>>>>>>> - run one heavy spark job (5-6 hours)
>>>>>>>> - save data to s3
>>>>>>>> - stop cluster
>>>>>>>>
>>>>>>>> Officially aws emr-4.0.0 comes with Spark on Yarn
>>>>>>>> It's probably possible to hack emr by creating bootstrap script
>>>>>>>> which stops yarn and starts master and slaves on each computer  (to 
>>>>>>>> start
>>>>>>>> Spark in standalone mode)
>>>>>>>>
>>>>>>>> My questions are
>>>>>>>> - Does Spark standalone provides significant performance / memory
>>>>>>>> improvement in comparison to YARN mode?
>>>>>>>> - Does it worth hacking official emr Spark on Yarn and switch Spark
>>>>>>>> to Standalone mode?
>>>>>>>>
>>>>>>>>
>>>>>>>> I already created comparison table and want you to check if my
>>>>>>>> understanding is correct
>>>>>>>>
>>>>>>>> Lets say r3.2xlarge computer has 52GB ram available for Spark
>>>>>>>> Executor JVMs
>>>>>>>>
>>>>>>>> standalone to yarn comparison
>>>>>>>>
>>>>>>>>
>>>>>>>> STDLN   YARN
>>>>>>>>
>>>>>>>> can executor allocate up to 52GB ram   -
>>>>>>>> yes  |  yes
>>>>>>>>
>>>>>>>> will executor be unresponsive after using all 52GB ram because of
>>>>>>>> GC - yes  |  yes
>>>>>>>>
>>>>>>>> additional JVMs on slave except of spark executor- workr |
>>>>>>>> node mngr
>>>>>>>>
>>>>>>>> are additional JVMs lightweight
>>>>>>>> - yes  |  yes
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you
>>>>>>>>
>>>>>>>> Alex
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark on Yarn vs Standalone

2015-09-10 Thread Sandy Ryza
YARN will never kill processes for being unresponsive.

It may kill processes for occupying more memory than it allows.  To get
around this, you can either bump spark.yarn.executor.memoryOverhead or turn
off the memory checks entirely with yarn.nodemanager.pmem-check-enabled.

-Sandy

On Tue, Sep 8, 2015 at 10:48 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> The problem which we have now is skew data (2360 tasks done in 5 min, 3
> tasks in 40 min and 1 task in 2 hours)
>
> Some people from the team worry that the executor which runs the longest
> task can be killed by YARN (because executor might be unresponsive because
> of GC or it might occupy more memory than Yarn allows)
>
>
>
> On Tue, Sep 8, 2015 at 3:02 PM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> Those settings seem reasonable to me.
>>
>> Are you observing performance that's worse than you would expect?
>>
>> -Sandy
>>
>> On Mon, Sep 7, 2015 at 11:22 AM, Alexander Pivovarov <
>> apivova...@gmail.com> wrote:
>>
>>> Hi Sandy
>>>
>>> Thank you for your reply
>>> Currently we use r3.2xlarge boxes (vCPU: 8, Mem: 61 GiB)
>>> with emr setting for Spark "maximizeResourceAllocation": "true"
>>>
>>> It is automatically converted to Spark settings
>>> spark.executor.memory47924M
>>> spark.yarn.executor.memoryOverhead 5324
>>>
>>> we also set spark.default.parallelism = slave_count * 16
>>>
>>> Does it look good for you? (we run single heavy job on cluster)
>>>
>>> Alex
>>>
>>> On Mon, Sep 7, 2015 at 11:03 AM, Sandy Ryza <sandy.r...@cloudera.com>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> If they're both configured correctly, there's no reason that Spark
>>>> Standalone should provide performance or memory improvement over Spark on
>>>> YARN.
>>>>
>>>> -Sandy
>>>>
>>>> On Fri, Sep 4, 2015 at 1:24 PM, Alexander Pivovarov <
>>>> apivova...@gmail.com> wrote:
>>>>
>>>>> Hi Everyone
>>>>>
>>>>> We are trying the latest aws emr-4.0.0 and Spark and my question is
>>>>> about YARN vs Standalone mode.
>>>>> Our usecase is
>>>>> - start 100-150 nodes cluster every week,
>>>>> - run one heavy spark job (5-6 hours)
>>>>> - save data to s3
>>>>> - stop cluster
>>>>>
>>>>> Officially aws emr-4.0.0 comes with Spark on Yarn
>>>>> It's probably possible to hack emr by creating bootstrap script which
>>>>> stops yarn and starts master and slaves on each computer  (to start Spark
>>>>> in standalone mode)
>>>>>
>>>>> My questions are
>>>>> - Does Spark standalone provides significant performance / memory
>>>>> improvement in comparison to YARN mode?
>>>>> - Does it worth hacking official emr Spark on Yarn and switch Spark to
>>>>> Standalone mode?
>>>>>
>>>>>
>>>>> I already created comparison table and want you to check if my
>>>>> understanding is correct
>>>>>
>>>>> Lets say r3.2xlarge computer has 52GB ram available for Spark Executor
>>>>> JVMs
>>>>>
>>>>> standalone to yarn comparison
>>>>>
>>>>>
>>>>>   STDLN   YARN
>>>>>
>>>>> can executor allocate up to 52GB ram   - yes
>>>>>  |  yes
>>>>>
>>>>> will executor be unresponsive after using all 52GB ram because of GC -
>>>>> yes  |  yes
>>>>>
>>>>> additional JVMs on slave except of spark executor- workr |
>>>>> node mngr
>>>>>
>>>>> are additional JVMs lightweight -
>>>>> yes  |  yes
>>>>>
>>>>>
>>>>> Thank you
>>>>>
>>>>> Alex
>>>>>
>>>>
>>>>
>>>
>>
>


Driver OOM after upgrading to 1.5

2015-09-09 Thread Sandy Ryza
I just upgraded the spark-timeseries
 project to run on top of
1.5, and I'm noticing that tests are failing with OOMEs.

I ran a jmap -histo on the process and discovered the top heap items to be:
   1:163428   22236064  
   2:163428   21112648  
   3: 12638   14459192  
   4: 12638   13455904  
   5: 105397642528  

Not sure whether this is suspicious.  Any ideas?

-Sandy


Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Sandy Ryza
Java 7.

FWIW I was just able to get it to work by increasing MaxPermSize to 256m.

-Sandy

On Wed, Sep 9, 2015 at 11:37 AM, Reynold Xin <r...@databricks.com> wrote:

> Java 7 / 8?
>
> On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> I just upgraded the spark-timeseries
>> <https://github.com/cloudera/spark-timeseries> project to run on top of
>> 1.5, and I'm noticing that tests are failing with OOMEs.
>>
>> I ran a jmap -histo on the process and discovered the top heap items to
>> be:
>>1:163428   22236064  
>>2:163428   21112648  
>>3: 12638   14459192  
>>4: 12638   13455904  
>>5: 105397642528  
>>
>> Not sure whether this is suspicious.  Any ideas?
>>
>> -Sandy
>>
>
>


Re: Spark on Yarn vs Standalone

2015-09-08 Thread Sandy Ryza
Those settings seem reasonable to me.

Are you observing performance that's worse than you would expect?

-Sandy

On Mon, Sep 7, 2015 at 11:22 AM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> Hi Sandy
>
> Thank you for your reply
> Currently we use r3.2xlarge boxes (vCPU: 8, Mem: 61 GiB)
> with emr setting for Spark "maximizeResourceAllocation": "true"
>
> It is automatically converted to Spark settings
> spark.executor.memory47924M
> spark.yarn.executor.memoryOverhead 5324
>
> we also set spark.default.parallelism = slave_count * 16
>
> Does it look good for you? (we run single heavy job on cluster)
>
> Alex
>
> On Mon, Sep 7, 2015 at 11:03 AM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> Hi Alex,
>>
>> If they're both configured correctly, there's no reason that Spark
>> Standalone should provide performance or memory improvement over Spark on
>> YARN.
>>
>> -Sandy
>>
>> On Fri, Sep 4, 2015 at 1:24 PM, Alexander Pivovarov <apivova...@gmail.com
>> > wrote:
>>
>>> Hi Everyone
>>>
>>> We are trying the latest aws emr-4.0.0 and Spark and my question is
>>> about YARN vs Standalone mode.
>>> Our usecase is
>>> - start 100-150 nodes cluster every week,
>>> - run one heavy spark job (5-6 hours)
>>> - save data to s3
>>> - stop cluster
>>>
>>> Officially aws emr-4.0.0 comes with Spark on Yarn
>>> It's probably possible to hack emr by creating bootstrap script which
>>> stops yarn and starts master and slaves on each computer  (to start Spark
>>> in standalone mode)
>>>
>>> My questions are
>>> - Does Spark standalone provides significant performance / memory
>>> improvement in comparison to YARN mode?
>>> - Does it worth hacking official emr Spark on Yarn and switch Spark to
>>> Standalone mode?
>>>
>>>
>>> I already created comparison table and want you to check if my
>>> understanding is correct
>>>
>>> Lets say r3.2xlarge computer has 52GB ram available for Spark Executor
>>> JVMs
>>>
>>> standalone to yarn comparison
>>>
>>>
>>> STDLN   YARN
>>>
>>> can executor allocate up to 52GB ram   - yes  |
>>>  yes
>>>
>>> will executor be unresponsive after using all 52GB ram because of GC -
>>> yes  |  yes
>>>
>>> additional JVMs on slave except of spark executor- workr | node
>>> mngr
>>>
>>> are additional JVMs lightweight -
>>> yes  |  yes
>>>
>>>
>>> Thank you
>>>
>>> Alex
>>>
>>
>>
>


Re: Spark on Yarn vs Standalone

2015-09-07 Thread Sandy Ryza
Hi Alex,

If they're both configured correctly, there's no reason that Spark
Standalone should provide performance or memory improvement over Spark on
YARN.

-Sandy

On Fri, Sep 4, 2015 at 1:24 PM, Alexander Pivovarov 
wrote:

> Hi Everyone
>
> We are trying the latest aws emr-4.0.0 and Spark and my question is about
> YARN vs Standalone mode.
> Our usecase is
> - start 100-150 nodes cluster every week,
> - run one heavy spark job (5-6 hours)
> - save data to s3
> - stop cluster
>
> Officially aws emr-4.0.0 comes with Spark on Yarn
> It's probably possible to hack emr by creating bootstrap script which
> stops yarn and starts master and slaves on each computer  (to start Spark
> in standalone mode)
>
> My questions are
> - Does Spark standalone provides significant performance / memory
> improvement in comparison to YARN mode?
> - Does it worth hacking official emr Spark on Yarn and switch Spark to
> Standalone mode?
>
>
> I already created comparison table and want you to check if my
> understanding is correct
>
> Lets say r3.2xlarge computer has 52GB ram available for Spark Executor JVMs
>
> standalone to yarn comparison
>
>
>   STDLN   YARN
>
> can executor allocate up to 52GB ram   - yes  |
>  yes
>
> will executor be unresponsive after using all 52GB ram because of GC - yes
>  |  yes
>
> additional JVMs on slave except of spark executor- workr | node
> mngr
>
> are additional JVMs lightweight - yes
>  |  yes
>
>
> Thank you
>
> Alex
>


Re: Spark Effects of Driver Memory, Executor Memory, Driver Memory Overhead and Executor Memory Overhead on success of job runs

2015-08-31 Thread Sandy Ryza
Hi Timothy,

For your first question, you would need to look in the logs and provide
additional information about why your job is failing.  The SparkContext
shutting down could happen for a variety of reasons.

In the situation where you give more memory, but less memory overhead, and
the job completes less quickly, have you checked to see whether YARN is
killing any containers?  It could be that the job completes more slowly
because, without the memory overhead, YARN kills containers while it's
running.  So it needs to run some tasks multiple times.

-Sandy

On Sat, Aug 29, 2015 at 6:57 PM, timothy22000 
wrote:

> I am doing some memory tuning on my Spark job on YARN and I notice
> different
> settings would give different results and affect the outcome of the Spark
> job run. However, I am confused and do not understand completely why it
> happens and would appreciate if someone can provide me with some guidance
> and explanation.
>
> I will provide some background information and describe the cases that I
> have experienced and post my questions after them below.
>
> *My environment setting were as below:*
>
>  - Memory 20G, 20 VCores per node (3 nodes in total)
>  - Hadoop 2.6.0
>  - Spark 1.4.0
>
> My code recursively filters an RDD to make it smaller (removing examples as
> part of an algorithm), then does mapToPair and collect to gather the
> results
> and save them within a list.
>
>  First Case
>
> /`/bin/spark-submit --class  --master yarn-cluster
> --driver-memory 7g --executor-memory 1g --num-executors 3 --executor-cores
> 1
> --jars `
> /
> If I run my program with any driver memory less than 11g, I will get the
> error below which is the SparkContext being stopped or a similar error
> which
> is a method being called on a stopped SparkContext. From what I have
> gathered, this is related to memory not being enough.
>
>
>  >
>
> Second Case
>
>
> /`/bin/spark-submit --class  --master yarn-cluster
> --driver-memory 7g --executor-memory 3g --num-executors 3 --executor-cores
> 1
> --jars `/
>
> If I run the program with the same driver memory but higher executor
> memory,
> the job runs longer (about 3-4 minutes) than the first case and then it
> will
> encounter a different error from earlier which is a Container
> requesting/using more memory than allowed and is being killed because of
> that. Although I find it weird since the executor memory is increased and
> this error occurs instead of the error in the first case.
>
>  >
>
> Third Case
>
>
> /`/bin/spark-submit --class  --master yarn-cluster
> --driver-memory 11g --executor-memory 1g --num-executors 3 --executor-cores
> 1 --jars `/
>
> Any setting with driver memory greater than 10g will lead to the job being
> able to run successfully.
>
> Fourth Case
>
>
> /`/bin/spark-submit --class  --master yarn-cluster
> --driver-memory 2g --executor-memory 1g --conf
> spark.yarn.executor.memoryOverhead=1024 --conf
> spark.yarn.driver.memoryOverhead=1024 --num-executors 3 --executor-cores 1
> --jars `
> /
> The job will run successfully with this setting (driver memory 2g and
> executor memory 1g but increasing the driver memory overhead(1g) and the
> executor memory overhead(1g).
>
> Questions
>
>
>  1. Why is a different error thrown and the job runs longer (for the second
> case) between the first and second case with only the executor memory being
> increased? Are the two errors linked in some way?
>
>  2. Both the third and fourth case succeeds and I understand that it is
> because I am giving more memory which solves the memory problems. However,
> in the third case,
>
> /spark.driver.memory + spark.yarn.driver.memoryOverhead = the memory that
> YARN will create a JVM
> = 11g + (driverMemory * 0.07, with minimum of 384m)
> = 11g + 1.154g
> = 12.154g/
>
> So, from the formula, I can see that my job requires MEMORY_TOTAL of around
> 12.154g to run successfully which explains why I need more than 10g for the
> driver memory setting.
>
> But for the fourth case,
>
> /
> spark.driver.memory + spark.yarn.driver.memoryOverhead = the memory that
> YARN will create a JVM
> = 2 + (driverMemory * 0.07, with minimum of 384m)
> = 2g + 0.524g
> = 2.524g
> /
>
> It seems that just by increasing the memory overhead by a small amount of
> 1024(1g) it leads to the successful run of the job with driver memory of
> only 2g and the MEMORY_TOTAL is only 2.524g! Whereas without the overhead
> configuration, driver memory less than 11g fails but it doesn't make sense
> from the formula which is why I am confused.
>
> Why increasing the memory overhead (for both driver and executor) allows my
> job to complete successfully with a lower MEMORY_TOTAL (12.154g vs 2.524g)?
> Is there some other internal things at work here that I am missing?
>
> I would really appreciate any helped offered as it 

Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
What version of Spark are you using?  Have you set any shuffle configs?

On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
 60 seconds

 and because of above error couple of chained errors starts to come like
 FetchFailedException, Rpc client disassociated, Connection reset by peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle read/write
 starts to increase more than 10 GB executors starts getting lost because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle read/write
 section I dont cache anything why Spark is not clearing those memory.
 Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
Moving this back onto user@

Regarding GC, can you look in the web UI and see whether the GC time
metric dominates the amount of time spent on each task (or at least the
tasks that aren't completing)?

Also, have you tried bumping your spark.yarn.executor.memoryOverhead?  YARN
may be killing your executors for using too much off-heap space.  You can
see whether this is happening by looking in the Spark AM or YARN
NodeManager logs.

-Sandy

On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi thanks much for the response. Yes I tried default settings too 0.2 it
 was also going into timeout if it is spending time in GC then why it is not
 throwing GC error I don't see any such error. Yarn logs are not helpful at
 all. What is tungsten how do I use it? Spark is doing great I believe my
 job runs successfully and 60% tasks completes only after first executor
 gets lost things are messing.
 On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What sounds most likely is that you're hitting heavy garbage collection.
 Did you hit issues when the shuffle memory fraction was at its default of
 0.2?  A potential danger with setting the shuffle storage to 0.7 is that it
 allows shuffle objects to get into the GC old generation, which triggers
 more stop-the-world garbage collections.

 Have you tried enabling Tungsten / unsafe?

 Unfortunately, Spark is still not that great at dealing with
 heavily-skewed shuffle data, because its reduce-side aggregation still
 operates on Java objects instead of binary data.

 -Sandy

 On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have
 set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries
 executed using hiveContext.sql my data set is skewed so will be more
 shuffling I believe I don't know what's wrong spark job runs fine for
 almost an hour and when shuffle read shuffle write column in UI starts to
 show more than 10 gb executor starts to getting lost because of timeout and
 slowly other executor starts getting lost. Please guide.
 On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What version of Spark are you using?  Have you set any shuffle configs?

 On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
 60 seconds

 and because of above error couple of chained errors starts to come like
 FetchFailedException, Rpc client disassociated, Connection reset by
 peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle read/write
 starts to increase more than 10 GB executors starts getting lost
 because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle
 read/write
 section I dont cache anything why Spark is not clearing those memory.
 Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
GC wouldn't necessarily result in errors - it could just be slowing down
your job and causing the executor JVMs to stall.  If you click on a stage
in the UI, you should end up on a page with all the metrics concerning the
tasks that ran in that stage.  GC Time is one of these task metrics.

-Sandy

On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead
 as 3500 which seems to be good enough I believe. So you mean only GC could
 be the reason behind timeout I checked Yarn logs I did not see any GC error
 there. Please guide. Thanks much.

 On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Moving this back onto user@

 Regarding GC, can you look in the web UI and see whether the GC time
 metric dominates the amount of time spent on each task (or at least the
 tasks that aren't completing)?

 Also, have you tried bumping your spark.yarn.executor.memoryOverhead?
 YARN may be killing your executors for using too much off-heap space.  You
 can see whether this is happening by looking in the Spark AM or YARN
 NodeManager logs.

 -Sandy

 On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi thanks much for the response. Yes I tried default settings too 0.2 it
 was also going into timeout if it is spending time in GC then why it is not
 throwing GC error I don't see any such error. Yarn logs are not helpful at
 all. What is tungsten how do I use it? Spark is doing great I believe my
 job runs successfully and 60% tasks completes only after first executor
 gets lost things are messing.
 On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What sounds most likely is that you're hitting heavy garbage
 collection.  Did you hit issues when the shuffle memory fraction was at its
 default of 0.2?  A potential danger with setting the shuffle storage to 0.7
 is that it allows shuffle objects to get into the GC old generation, which
 triggers more stop-the-world garbage collections.

 Have you tried enabling Tungsten / unsafe?

 Unfortunately, Spark is still not that great at dealing with
 heavily-skewed shuffle data, because its reduce-side aggregation still
 operates on Java objects instead of binary data.

 -Sandy

 On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I
 have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby
 queries executed using hiveContext.sql my data set is skewed so will be
 more shuffling I believe I don't know what's wrong spark job runs fine for
 almost an hour and when shuffle read shuffle write column in UI starts to
 show more than 10 gb executor starts to getting lost because of timeout 
 and
 slowly other executor starts getting lost. Please guide.
 On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What version of Spark are you using?  Have you set any shuffle
 configs?

 On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout
 exceeds
 60 seconds

 and because of above error couple of chained errors starts to come
 like
 FetchFailedException, Rpc client disassociated, Connection reset by
 peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle
 read/write
 starts to increase more than 10 GB executors starts getting lost
 because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle
 read/write
 section I dont cache anything why Spark is not clearing those
 memory. Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Executors on multiple nodes

2015-08-16 Thread Sandy Ryza
Hi Mohit,

It depends on whether dynamic allocation is turned on.  If not, the number
of executors is specified by the user with the --num-executors option.  If
dynamic allocation is turned on, refer to the doc for details:
https://spark.apache.org/docs/1.4.0/job-scheduling.html#dynamic-resource-allocation
.

-Sandy


On Sat, Aug 15, 2015 at 6:40 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 I am running on Yarn and do have a question on how spark runs executors on
 different data nodes. Is that primarily decided based on number of
 receivers?

 What do I need to do to ensure that multiple nodes are being used for data
 processing?



Re: Boosting spark.yarn.executor.memoryOverhead

2015-08-11 Thread Sandy Ryza
Hi Eric,

This is likely because you are putting the parameter after the primary
resource (latest_msmtdt_by_gridid_and_source.py), which makes it a
parameter to your application instead of a parameter to Spark/

-Sandy

On Wed, Aug 12, 2015 at 4:40 AM, Eric Bless eric.bl...@yahoo.com.invalid
wrote:

 Previously I was getting a failure which included the message
 Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB
 physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

 So I attempted the following -
 spark-submit --jars examples.jar latest_msmtdt_by_gridid_and_source.py
 --conf spark.yarn.executor.memoryOverhead=1024 host table

 This resulted in -
 Application application_1438983806434_24070 failed 2 times due to AM
 Container for appattempt_1438983806434_24070_02 exited with exitCode:
 -1000

 Am I specifying the spark.yarn.executor.memoryOverhead incorrectly?




Re: Spark on YARN

2015-08-08 Thread Sandy Ryza
Hi Jem,

Do they fail with any particular exception?  Does YARN just never end up
giving them resources?  Does an application master start?  If so, what are
in its logs?  If not, anything suspicious in the YARN ResourceManager logs?

-Sandy

On Fri, Aug 7, 2015 at 1:48 AM, Jem Tucker jem.tuc...@gmail.com wrote:

 Hi,

 I am running spark on YARN on the CDH5.3.2 stack. I have created a new
 user to own and run a testing environment, however when using this user
 applications I submit to yarn never begin to run, even if they are the
 exact same application that is successful with another user?

 Has anyone seen anything like this before?

 Thanks,

 Jem



Re: [General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?

2015-07-19 Thread Sandy Ryza
Hi Mike,

Spark is rack-aware in its task scheduling.  Currently Spark doesn't honor
any locality preferences when scheduling executors, but this is being
addressed in SPARK-4352, after which executor-scheduling will be rack-aware
as well.

-Sandy

On Sat, Jul 18, 2015 at 6:25 PM, Mike Frampton mike_framp...@hotmail.com
wrote:

 I wanted to ask a general question about Hadoop/Yarn and Apache Spark
 integration. I know that
 Hadoop on a physical cluster has rack awareness. i.e. It attempts to
 minimise network traffic
 by saving replicated blocks within a rack. i.e.

 I wondered whether, when Spark is configured to use Yarn as a cluster
 manager, it is able to
 use this feature to also minimise network traffic to a degree.

 Sorry if this questionn is not quite accurate but I think you can
 generally see what I mean ?



Re: What else is need to setup native support of BLAS/LAPACK with Spark?

2015-07-17 Thread Sandy Ryza
Can you try setting the spark.yarn.jar property to make sure it points to
the jar you're thinking of?

-Sandy

On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Yes, it's a YARN cluster and using spark-submit to run.  I have SPARK_HOME
 set to the directory above and using the spark-submit script from there.

 bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 
 8g --num-executors 400 --executor-cores 1 --class 
 org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 
 --conf spark.storage.memoryFraction=0.15

 ​

 libgfortran.so.3 is also there

 ls  /usr/lib64/libgfortran.so.3
 /usr/lib64/libgfortran.so.3

 These are jniloader files in the jar

 jar tf 
 /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  | grep jniloader
 META-INF/maven/com.github.fommil/jniloader/
 META-INF/maven/com.github.fommil/jniloader/pom.xml
 META-INF/maven/com.github.fommil/jniloader/pom.properties

 ​

 Thanks,
 Arun

 On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote:

 Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue.

 I'm pretty sure the answer is 'yes', but, make sure the assembly has
 jniloader too. I don't see why it wouldn't, but, that's needed.

 What is your env like -- local, standalone, YARN? how are you running?
 Just want to make sure you are using this assembly across your cluster.

 On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote:

 Hi Sean,

 Thanks for the reply! I did double-check that the jar is one I think I
 am running:

 [image: Inline image 2]

 jar tf 
 /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  | grep netlib | grep Native
 com/github/fommil/netlib/NativeRefARPACK.class
 com/github/fommil/netlib/NativeRefBLAS.class
 com/github/fommil/netlib/NativeRefLAPACK.class
 com/github/fommil/netlib/NativeSystemARPACK.class
 com/github/fommil/netlib/NativeSystemBLAS.class
 com/github/fommil/netlib/NativeSystemLAPACK.class

 Also, I checked the gfortran version on the cluster nodes and it is
 available and is 5.1

 $ gfortran --version
 GNU Fortran (GCC) 5.1.0
 Copyright (C) 2015 Free Software Foundation, Inc.

 and still see:

 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: 
 com.github.fommil.netlib.NativeSystemBLAS
 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: 
 com.github.fommil.netlib.NativeRefBLAS
 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: 
 com.github.fommil.netlib.NativeSystemLAPACK
 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: 
 com.github.fommil.netlib.NativeRefLAPACK

 ​

 Does anything need to be adjusted in my application POM?

 Thanks,
 Arun

 On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote:

 Yes, that's most of the work, just getting the native libs into the
 assembly. netlib can find them from there even if you don't have BLAS
 libs on your OS, since it includes a reference implementation as a
 fallback.

 One common reason it won't load is not having libgfortran installed on
 your OSes though. It has to be 4.6+ too. That can't be shipped even in
 netlib and has to exist on your hosts.

 The other thing I'd double-check is whether you are really using this
 assembly you built for your job -- like, it's the actually the
 assembly the executors are using.


 On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote:
  Is there more documentation on what is needed to setup BLAS/LAPACK
 native
  suport with Spark.
 
  I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib
 classes
  are in the assembly jar.
 
  jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar  | grep netlib
 | grep
  Native
6625 Tue Jul 07 15:22:08 EDT 2015
  com/github/fommil/netlib/NativeRefARPACK.class
   21123 Tue Jul 07 15:22:08 EDT 2015
  com/github/fommil/netlib/NativeRefBLAS.class
  178334 Tue Jul 07 15:22:08 EDT 2015
  com/github/fommil/netlib/NativeRefLAPACK.class
6640 Tue Jul 07 15:22:10 EDT 2015
  com/github/fommil/netlib/NativeSystemARPACK.class
   21138 Tue Jul 07 15:22:10 EDT 2015
  com/github/fommil/netlib/NativeSystemBLAS.class
  178349 Tue Jul 07 15:22:10 EDT 2015
  com/github/fommil/netlib/NativeSystemLAPACK.class
 
  Also I see the following in /usr/lib64
 
  ls /usr/lib64/libblas.
  libblas.a libblas.solibblas.so.3  libblas.so.3.2
  libblas.so.3.2.1
 
  ls /usr/lib64/liblapack
  liblapack.a liblapack_pic.a liblapack.so
 liblapack.so.3
  liblapack.so.3.2liblapack.so.3.2.1
 
  But I stil see the following in the Spark logs:
 
  15/07/07 15:36:25 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeSystemBLAS
  15/07/07 15:36:25 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeRefBLAS
  15/07/07 15:36:26 WARN LAPACK: Failed to load implementation 

Re: Unable to use dynamicAllocation if spark.executor.instances is set in spark-defaults.conf

2015-07-15 Thread Sandy Ryza
Hi Jonathan,

This is a problem that has come up for us as well, because we'd like
dynamic allocation to be turned on by default in some setups, but not break
existing users with these properties.  I'm hoping to figure out a way to
reconcile these by Spark 1.5.

-Sandy

On Wed, Jul 15, 2015 at 3:18 PM, Kelly, Jonathan jonat...@amazon.com
wrote:

   Would there be any problem in having spark.executor.instances (or
 --num-executors) be completely ignored (i.e., even for non-zero values) if
 spark.dynamicAllocation.enabled is true (i.e., rather than throwing an
 exception)?

  I can see how the exception would be helpful if, say, you tried to pass
 both -c spark.executor.instances (or --num-executors) *and* -c
 spark.dynamicAllocation.enabled=true to spark-submit on the command line
 (as opposed to having one of them in spark-defaults.conf and one of them in
 the spark-submit args), but currently there doesn't seem to be any way to
 distinguish between arguments that were actually passed to spark-submit and
 settings that simply came from spark-defaults.conf.

  If there were a way to distinguish them, I think the ideal situation
 would be for the validation exception to be thrown only if
 spark.executor.instances and spark.dynamicAllocation.enabled=true were both
 passed via spark-submit args or were both present in spark-defaults.conf,
 but passing spark.dynamicAllocation.enabled=true to spark-submit would take
 precedence over spark.executor.instances configured in spark-defaults.conf,
 and vice versa.


  Jonathan Kelly

 Elastic MapReduce - SDE

 Blackfoot (SEA33) 06.850.F0

   From: Jonathan Kelly jonat...@amazon.com
 Date: Tuesday, July 14, 2015 at 4:23 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Unable to use dynamicAllocation if spark.executor.instances is
 set in spark-defaults.conf

   I've set up my cluster with a pre-calcualted value for
 spark.executor.instances in spark-defaults.conf such that I can run a job
 and have it maximize the utilization of the cluster resources by default.
 However, if I want to run a job with dynamicAllocation (by passing -c
 spark.dynamicAllocation.enabled=true to spark-submit), I get this exception:

  Exception in thread main java.lang.IllegalArgumentException:
 Explicitly setting the number of executors is not compatible with
 spark.dynamicAllocation.enabled!
 at
 org.apache.spark.deploy.yarn.ClientArguments.parseArgs(ClientArguments.scala:192)
 at
 org.apache.spark.deploy.yarn.ClientArguments.init(ClientArguments.scala:59)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:54)
  …

  The exception makes sense, of course, but ideally I would like it to
 ignore what I've put in spark-defaults.conf for spark.executor.instances if
 I've enabled dynamicAllocation. The most annoying thing about this is that
 if I have spark.executor.instances present in spark-defaults.conf, I cannot
 figure out any way to spark-submit a job with
 spark.dynamicAllocation.enabled=true without getting this error. That is,
 even if I pass -c spark.executor.instances=0 -c
 spark.dynamicAllocation.enabled=true, I still get this error because the
 validation in ClientArguments.parseArgs() that's checking for this
 condition simply checks for the presence of spark.executor.instances rather
 than whether or not its value is  0.

  Should the check be changed to allow spark.executor.instances to be set
 to 0 if spark.dynamicAllocation.enabled is true? That would be an OK
 compromise, but I'd really prefer to be able to enable dynamicAllocation
 simply by setting spark.dynamicAllocation.enabled=true rather than by also
 having to set spark.executor.instances to 0.


  Thanks,

 Jonathan



Re: How to restrict disk space for spark caches on yarn?

2015-07-13 Thread Sandy Ryza
To clear one thing up: the space taken up by data that Spark caches on disk
is not related to YARN's local resource / application cache concept.
The latter is a way that YARN provides for distributing bits to worker
nodes.  The former is just usage of disk by Spark, which happens to be in a
local directory that YARN gives it.  Based on its title, if YARN-882 were
resolved, it would do nothing to limit the amount of on-disk cache space
Spark could use.

-Sandy

On Mon, Jul 13, 2015 at 6:57 AM, Peter Rudenko petro.rude...@gmail.com
wrote:

  Hi Andrew, here's what i found. Maybe would be relevant for people with
 the same issue:

 1) There's 3 types of local resources in YARN (public, private,
 application). More about it here:
 http://hortonworks.com/blog/management-of-application-dependencies-in-yarn/

 2) Spark cache is of application type of resource.

 3) Currently it's not possible to specify quota for application resources (
 https://issues.apache.org/jira/browse/YARN-882)

 4) The only it's possible to specify these 2 settings:
 yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
 - The maximum percentage of disk space utilization allowed after which a
 disk is marked as bad. Values can range from 0.0 to 100.0. If the value is
 greater than or equal to 100, the nodemanager will check for full disk.
 This applies to yarn-nodemanager.local-dirs and yarn.nodemanager.log-dirs.

 yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb - The
 minimum space that must be available on a disk for it to be used. This
 applies to yarn-nodemanager.local-dirs and yarn.nodemanager.log-dirs.

 5) Yarn's cache cleanup doesn't cleaned app resources:
 https://github.com/apache/hadoop/blob/8d58512d6e6d9fe93784a9de2af0056bcc316d96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java#L511

 As i understood application resources cleaned when spark application
 correctly terminates (using sc.stop()). But in my case when it fills all
 disk space it was stucked and couldn't stop correctly. After i restarted
 yarn i don't know how easily trigger cache cleanup except of manually on
 all the nodes.

 Thanks,
 Peter Rudenko

 On 2015-07-10 20:07, Andrew Or wrote:

 Hi Peter,

  AFAIK Spark assumes infinite disk space, so there isn't really a way to
 limit how much space it uses. Unfortunately I'm not aware of a simpler
 workaround than to simply provision your cluster with more disk space. By
 the way, are you sure that it's disk space that exceeded the limit, but not
 the number of inodes? If it's the latter, maybe you could control the
 ulimit of the container.

  To answer your other question: if it can't persist to disk then yes it
 will fail. It will only recompute from the data source if for some reason
 someone evicted our blocks from memory, but that shouldn't happen in your
 case since your'e using MEMORY_AND_DISK_SER.

  -Andrew


 2015-07-10 3:51 GMT-07:00 Peter Rudenko petro.rude...@gmail.com:

  Hi, i have a spark ML worklflow. It uses some persist calls. When i
 launch it with 1 tb dataset - it puts down all cluster, becauses it fills
 all disk space at /yarn/nm/usercache/root/appcache:
 http://i.imgur.com/qvRUrOp.png

 I found a yarn settings:
 *yarn*.nodemanager.localizer.*cache*.target-size-mb - Target size of
 localizer cache in MB, per nodemanager. It is a target retention size that
 only includes resources with PUBLIC and PRIVATE visibility and excludes
 resources with APPLICATION visibility

 But it excludes resources with APPLICATION visibility, and spark cache as
 i understood is of APPLICATION type.

 Is it possible to restrict a disk space for spark application? Will spark
 fail if it wouldn't be able to persist on disk
 (StorageLevel.MEMORY_AND_DISK_SER) or it would recompute from data source?

 Thanks,
 Peter Rudenko









Re: Pyspark not working on yarn-cluster mode

2015-07-10 Thread Sandy Ryza
To add to this, conceptually, it makes no sense to launch something in
yarn-cluster mode by creating a SparkContext on the client - the whole
point of yarn-cluster mode is that the SparkContext runs on the cluster,
not on the client.

On Thu, Jul 9, 2015 at 2:35 PM, Marcelo Vanzin van...@cloudera.com wrote:

 You cannot run Spark in cluster mode by instantiating a SparkContext like
 that.

 You have to launch it with the spark-submit command line script.

 On Thu, Jul 9, 2015 at 2:23 PM, jegordon jgordo...@gmail.com wrote:

 Hi to all,

 Is there any way to run pyspark scripts with yarn-cluster mode without
 using
 the spark-submit script? I need it in this way because i will integrate
 this
 code into a django web app.

 When i try to run any script in yarn-cluster mode i got the following
 error
 :

 org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
 running on a cluster. Deployment to YARN is not supported directly by
 SparkContext. Please use spark-submit.


 I'm creating the sparkContext in the following way :

 conf = (SparkConf()
 .setMaster(yarn-cluster)
 .setAppName(DataFrameTest))

 sc = SparkContext(conf = conf)

 #Dataframe code 

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Marcelo



Re: Remote spark-submit not working with YARN

2015-07-08 Thread Sandy Ryza
Strange.  Does the application show up at all in the YARN web UI?
Does application_1436314873375_0030
show up at all in the YARN ResourceManager logs?

-Sandy

On Wed, Jul 8, 2015 at 3:32 PM, Juan Gordon jgordo...@gmail.com wrote:

 Hello Sandy,

 Yes I'm sure that YARN has the enought resources, i checked it in the WEB
 UI page of my cluster

 Also, i'm able to submit the same script in any of the nodes of the
 cluster.

 That's why i don't understand whats happening.

 Thanks

 JG

 On Wed, Jul 8, 2015 at 5:26 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi JG,

 One way this can occur is that YARN doesn't have enough resources to run
 your job.  Have you verified that it does?  Are you able to submit using
 the same command from a node on the cluster?

 -Sandy

 On Wed, Jul 8, 2015 at 3:19 PM, jegordon jgordo...@gmail.com wrote:

 I'm trying to submit a spark job from a different server outside of my
 Spark
 Cluster (running spark 1.4.0, hadoop 2.4.0 and YARN) using the
 spark-submit
 script :

 spark/bin/spark-submit --master yarn-client --executor-memory 4G
 myjobScript.py

 The think is that my application never pass from the accepted state, it
 stuck on it :

 15/07/08 16:49:40 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:41 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:42 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:43 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:44 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:45 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:46 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:47 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:48 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:49 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)

 But if i execute the same script with spark-submit in the master server
 of
 my cluster it runs correctly.

 I already set the yarn configuration in the remote server in
 $YARN_CONF_DIR/yarn-site.xml like this :

  property
 nameyarn.resourcemanager.hostname/name
 value54.54.54.54/value
  /property

  property
nameyarn.resourcemanager.address/name
value54.54.54.54:8032/value
descriptionEnter your ResourceManager hostname./description
  /property

  property
nameyarn.resourcemanager.scheduler.address/name
value54.54.54.54:8030/value
descriptionEnter your ResourceManager hostname./description
  /property

  property
nameyarn.resourcemanager.resourcetracker.address/name
value54.54.54.54:8031/value
descriptionEnter your ResourceManager hostname./description
  /property
 Where 54.54.54.54 is the IP of my resourcemanager node.

 Why is this happening? do i have to configure something else in YARN to
 accept remote submits? or what am i missing?

 Thanks a lot

 JG




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Remote-spark-submit-not-working-with-YARN-tp23728.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Saludos,
 Juan Gordon



Re: Executors requested are way less than what i actually got

2015-06-26 Thread Sandy Ryza
The scheduler configurations are helpful as well, but not useful without
the information outlined above.

-Sandy

On Fri, Jun 26, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 These are my YARN queue configurations

 Queue State:RUNNINGUsed Capacity:206.7%Absolute Used Capacity:3.1%Absolute
 Capacity:1.5%Absolute Max Capacity:10.0%Used Resources:memory:5578496,
 vCores:390Num Schedulable Applications:7Num Non-Schedulable Applications:
 0Num Containers:390Max Applications:45Max Applications Per User:27Max
 Schedulable Applications:1278Max Schedulable Applications Per 
 User:116Configured
 Capacity:1.5%Configured Max Capacity:10.0%Configured Minimum User Limit
 Percent:30%Configured User Limit Factor:2.0
 Executors:
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars
 /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
 * --num-executors 9973 --driver-memory 14g --driver-java-options
 -XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCTimeStamps --executor-memory 14g --executor-cores 1 *--queue
 hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
 /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
 startDate=2015-06-20 endDate=2015-06-21
 input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=viewItem
 output=/user/dvasthimal/epdatasets/viewItem buffersize=128
 maxbuffersize=1068 maxResultSize=200G




 On Thu, Jun 25, 2015 at 4:52 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 How many nodes do you have, how much space is allocated to each node for
 YARN, how big are the executors you're requesting, and what else is running
 on the cluster?

 On Thu, Jun 25, 2015 at 3:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I run Spark App on Spark 1.3.1 over YARN.

 When i request --num-executors 9973 and when i see Executors from
 Environment tab from SPARK UI its between 200 to 300.

 What is incorrect here ?

 --
 Deepak





 --
 Deepak




Re: Executors requested are way less than what i actually got

2015-06-25 Thread Sandy Ryza
How many nodes do you have, how much space is allocated to each node for
YARN, how big are the executors you're requesting, and what else is running
on the cluster?

On Thu, Jun 25, 2015 at 3:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I run Spark App on Spark 1.3.1 over YARN.

 When i request --num-executors 9973 and when i see Executors from
 Environment tab from SPARK UI its between 200 to 300.

 What is incorrect here ?

 --
 Deepak




Re: Spark launching without all of the requested YARN resources

2015-06-24 Thread Sandy Ryza
Hi Arun,

You can achieve this by
setting spark.scheduler.maxRegisteredResourcesWaitingTime to some really
high number and spark.scheduler.minRegisteredResourcesRatio to 1.0.

-Sandy

On Wed, Jun 24, 2015 at 2:21 AM, Steve Loughran ste...@hortonworks.com
wrote:


  On 24 Jun 2015, at 05:55, canan chen ccn...@gmail.com wrote:

  Why do you want it start until all the resources are ready ? Make it
 start as early as possible should make it complete earlier and increase the
 utilization of resources

 On Tue, Jun 23, 2015 at 10:34 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark
 (via spark-submit) will begin its processing even though it apparently did
 not get all of the requested resources; it is running very slowly.

  Is there a way to force Spark/YARN to only begin when it has the full
 set of resources that I request?

  Thanks,
 Arun




  The wait until there's space launch policy is known as Gang
 Scheduling, https://issues.apache.org/jira/browse/YARN-624 covers what
 would be needed there.

  1. It's not in YARN

  2. For analytics workloads, it's not clear you benefit. You would wait a
 very long time(*) for the requirements to be satisfied. The current YARN
 scheduling and placement algorithms assume that you'd prefer timely
 container launch to extended wait for containers in the right place, and
 expects algorithms to work in a degraded form with a reduced no. of workers

  3. Where it really matters is long-lived applications where you need
 some quorum of container-hosted processes, or if performance collapses
 utterly below a threshold. Things like HBase on YARN are an example —but
 Spark streaming could be another.

  In the absence of YARN support, it can be implemented in the application
 by having theYARN-hosted application (here: Spark) get the containers,
 start up a process on each one, but not actually start accepting/performing
 work until a threshold of containers is reached/some timeout has occurred.

  If you wanted to do that in spark, you could raise the idea on the spark
 dev lists and see what people think.

  -Steve

  (*) i.e. forever



Re: When to use underlying data management layer versus standalone Spark?

2015-06-24 Thread Sandy Ryza
Hi Michael,

Spark itself is an execution engine, not a storage system.  While it has
facilities for caching data in memory, think about these the way you would
think about a process on a single machine leveraging memory - the source
data needs to be stored somewhere, and you need to be able to access it
quickly in case there's a failure.

To echo what Sonal said, it depends on the needs of your application.  If
you expect to mostly write jobs that read and write data in batch, storing
data on HDFS in a binary format like Avro or Parquet will give you the bet
performance.  If other systems need random access to your data, you'd want
to consider a system like HBase and Cassandra, though these are likely to
suffer a little bit on performance and incur higher operational overhead.

-Sandy

On Tue, Jun 23, 2015 at 11:21 PM, Sonal Goyal sonalgoy...@gmail.com wrote:

 When you deploy spark over hadoop, you typically want to leverage the
 replication of hdfs or your data is already in hadoop. Again, if your data
 is already in Cassandra or if you want to do updateable atomic row
 operations and access to your data as well as run analytic jobs, that may
 be another case.
 On Jun 24, 2015 1:17 AM, commtech michael.leon...@opco.com wrote:

 Hi,

 I work at a large financial institution in New York. We're looking into
 Spark and trying to learn more about the deployment/use cases for
 real-time
 analytics with Spark. When would it be better to deploy standalone Spark
 versus Spark on top of a more comprehensive data management layer (Hadoop,
 Cassandra, MongoDB, etc.)? If you do deploy on top of one of these, are
 there different use cases where one of these database management layers
 are
 better or worse?

 Any color would be very helpful. Thank you in advance.

 Sincerely,
 Michael





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/When-to-use-underlying-data-management-layer-versus-standalone-Spark-tp23455.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Velox Model Server

2015-06-20 Thread Sandy Ryza
Hi Debasish,

The Oryx project (https://github.com/cloudera/oryx), which is Apache 2
licensed, contains a model server that can serve models built with MLlib.

-Sandy

On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com
wrote:

 Is velox NOT open source?


 On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote:

 Hi,

 The demo of end-to-end ML pipeline including the model server component
 at Spark Summit was really cool.

 I was wondering if the Model Server component is based upon Velox or it
 uses a completely different architecture.

 https://github.com/amplab/velox-modelserver

 We are looking for an open source version of model server to build upon.

 Thanks.
 Deb



 --
 - Charles



Re: Velox Model Server

2015-06-20 Thread Sandy Ryza
Oops, that link was for Oryx 1. Here's the repo for Oryx 2:
https://github.com/OryxProject/oryx

On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Debasish,

 The Oryx project (https://github.com/cloudera/oryx), which is Apache 2
 licensed, contains a model server that can serve models built with MLlib.

 -Sandy

 On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com
 wrote:

 Is velox NOT open source?


 On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi,

 The demo of end-to-end ML pipeline including the model server component
 at Spark Summit was really cool.

 I was wondering if the Model Server component is based upon Velox or it
 uses a completely different architecture.

 https://github.com/amplab/velox-modelserver

 We are looking for an open source version of model server to build upon.

 Thanks.
 Deb



 --
 - Charles





Re: deployment options for Spark and YARN w/ many app jar library dependencies

2015-06-17 Thread Sandy Ryza
Hi Matt,

If you place your jars on HDFS in a public location, YARN will cache them
on each node after the first download.  You can also use the
spark.executor.extraClassPath config to point to them.

-Sandy

On Wed, Jun 17, 2015 at 4:47 PM, Sweeney, Matt mswee...@fourv.com wrote:

  Hi folks,

  I’m looking to deploy spark on YARN and I have read through the docs (
 https://spark.apache.org/docs/latest/running-on-yarn.html). One question
 that I still have is if there is an alternate means of including your own
 app jars as opposed to the process in the “Adding Other Jars” section of
 the docs. The app jars and dependencies that I need to include are
 significant in size (100s MBs) and I’d rather deploy them in advance onto
 the cluster nodes disk so that I don’t have that overhead cost on the
 network for each spark-submit that is executed.

  Thanks in advance for your help!

  Matt



Re: [SparkScore] Performance portal for Apache Spark

2015-06-17 Thread Sandy Ryza
This looks really awesome.

On Tue, Jun 16, 2015 at 10:27 AM, Huang, Jie jie.hu...@intel.com wrote:

  Hi All

 We are happy to announce Performance portal for Apache Spark
 http://01org.github.io/sparkscore/ !

 The Performance Portal for Apache Spark provides performance data on the
 Spark upsteam to the community to help identify issues, better understand
 performance differentials between versions, and help Spark customers get
 across the finish line faster. The Performance Portal generates two
 reports, regular (weekly) report and release based regression test report.
 We are currently using two benchmark suites which include HiBench (
 http://github.com/intel-bigdata/HiBench) and Spark-perf (
 https://github.com/databricks/spark-perf ). We welcome and look forward
 to your suggestions and feedbacks. More information and details provided
 below
 Abount Performance Portal for Apache Spark

 Our goal is to work with the Apache Spark community to further enhance the
 scalability and reliability of the Apache Spark. The data available on this
 site allows community members and potential Spark customers to closely
 track performance trend of the Apache Spark. Ultimately, we hope that this
 project will help community to fix performance issue quickly, thus
 providing better Apache spark code to end customers. The current workloads
 used in the benchmarking include HiBench (a benchmark suite to evaluate big
 data framework like Hadoop MR, Spark from Intel) and Spark-perf (a
 performance testing framework for Apache Spark from Databricks). Additional
 benchmarks will be added as they become available
 Description
 --

 Each data point represents each workload runtime percent compared with the
 previous week. Different lines represents different workloads running on
 spark yarn-client mode.
 Hardware
 --

 CPU type: Intel® Xeon® CPU E5-2697 v2 @ 2.70GHz
 Memory: 128GB
 NIC: 10GbE
 Disk(s): 8 x 1TB SATA HDD
 Software
 --

 JAVA ver sion: 1.8.0_25
 Hadoop version: 2.5.0-CDH5.3.2
 HiBench version: 4.0
 Spark on yarn-client mode
 Cluster
 --

 1 node for Master
 10 nodes for Slave
 Summary

 The lower percent the better performance.
  --

 *Group*

 *ww19 *

 *ww20 *

 *ww22 *

 *ww23 *

 *ww24 *

 *ww25 *

 HiBench

 9.1%

 6.6%

 6.0%

 7.9%

 -6.5%

 -3.1%

 spark-perf

 4.1%

 4.4%

 -1.8%

 4.1%

 -4.7%

 -4.6%


 *Y-Axis: normalized completion time; X-Axis: Work Week. *

 * The commit number can be found in the result table. The performance
 score for each workload is normalized based on the elapsed time for 1.2
 release.The lower the better.*
 HiBench
 --

 *JOB*

 *ww19 *

 *ww20 *

 *ww22 *

 *ww23 *

 *ww24 *

 *ww25 *

 *commit*

 *489700c8 *

 *8e3822a0 *

 *530efe3e *

 *90c60692 *

 *db81b9d8 *

 *4eb48ed1 *

 sleep

 %

 %

 -2.1%

 -2.9%

 -4.1%

 12.8%

 wordcount

 17.6%

 11.4%

 8.0%

 8.3%

 -18.6%

 -10.9%

 kmeans

 92.1%

 61.5%

 72.1%

 92.9%

 86.9%

 95.8%

 scan

 -4.9%

 -7.2%

 %

 -1.1%

 -25.5%

 -21.0%

 bayes

 -24.3%

 -20.1%

 -18.3%

 -11.1%

 -29.7%

 -31.3%

 aggregation

 5.6%

 10.5%

 %

 9.2%

 -15.3%

 -15.0%

 join

 4.5%

 1.2%

 %

 1.0%

 -12.7%

 -13.9%

 sort

 -3.3%

 -0.5%

 -11.9%

 -12.5%

 -17.5%

 -17.3%

 pagerank

 2.2%

 3.2%

 4.0%

 2.9%

 -11.4%

 -13.0%

 terasort

 -7.1%

 -0.2%

 -9.5%

 -7.3%

 -16.7%

 -17.0%

 Comments: null means no such workload running or workload failed in this
 time.


 *Y-Axis: normalized completion time; X-Axis: Work Week. *

 * The commit number can be found in the result table. The performance
 score for each workload is normalized based on the elapsed time for 1.2
 release.The lower the better.*
 spark-perf
 --

 *JOB*

 *ww19 *

 *ww20 *

 *ww22 *

 *ww23 *

 *ww24 *

 *ww25 *

 *commit*

 *489700c8 *

 *8e3822a0 *

 *530efe3e *

 *90c60692 *

 *db81b9d8 *

 *4eb48ed1 *

 agg

 13.2%

 7.0%

 %

 18.3%

 5.2%

 2.5%

 agg-int

 16.4%

 21.2%

 %

 9.6%

 4.0%

 8.2%

 agg-naive

 4.3%

 -2.4%

 %

 -0.8%

 -6.7%

 -6.8 %

 scheduling

 -6.1%

 -8.9%

 -14.5%

 -2.1%

 -6.4%

 -6.5%

 count-filter

 4.1%

 1.0%

 6.6%

 6.8%

 -10.2%

 -10.4%

 count

 4.8%

 4.6%

 6.7%

 8.0%

 -7.3%

 -7.0%

 sort

 -8.1%

 -2.5%

 -6.2%

 -7.0%

 -14.6%

 -14.4%

 sort-int

 4.5%

 15.3%

 -1.6%

 -0.1%

 -1.5%

 -2.2%

 Comments: null means no such workload running or workload failed in this
 time.


 *Y-Axis: normalized completion time; X-Axis: Work Week. *

 * The commit number can be found in the result table. The pe rformance
 score for each workload is normalized based on the elapsed time for 1.2
 release.The lower the better.*
  Release
 Summary

 The lower percent the better performance.
  --

 *Group*

 *1.2.1 *

 *1.3.0 *

 *1.3.1 *

 *1.4.0 *

 HiBench

 -1.0%

 10.5%

 8.4%

 8.6%

 spark-perf

 3.2%

Re: Dynamic allocator requests -1 executors

2015-06-13 Thread Sandy Ryza
Hi Patrick,

I'm noticing that you're using Spark 1.3.1.  We fixed a bug in dynamic
allocation in 1.4 that permitted requesting negative numbers of executors.
Any chance you'd be able to try with the newer version and see if the
problem persists?

-Sandy

On Fri, Jun 12, 2015 at 7:42 PM, Patrick Woody patrick.woo...@gmail.com
wrote:

 Hey all,

 I've recently run into an issue where spark dynamicAllocation has asked
 for -1 executors from YARN. Unfortunately, this raises an exception that
 kills the executor-allocation thread and the application can't request more
 resources.

 Has anyone seen this before? It is spurious and the application usually
 works, but when this gets hit it becomes unusable when getting stuck at
 minimum YARN resources.

 Stacktrace below.

 Thanks!
 -Pat

 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught
 exception in thread spark-dynamic-executor-allocation-0
 471 ! java.lang.IllegalArgumentException: Attempted to request a negative
 number of executor(s) -1 from the cluster manager. Please specify a
 positive number!
 472 ! at
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
 ~[spark-core_2.10-1.3.1.jar:1.
 473 ! at
 org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 474 ! at
 org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 475 ! at
 org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 476 ! at 
 org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
 ~[spark-core_2.10-1.3.1.j
 477 ! at
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 478 ! at
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 479 ! at
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 480 ! at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 481 ! at
 org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
 [spark-core_2.10-1.3.1.jar:1.3.1]
 482 ! at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 [na:1.7.0_71]
 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
 [na:1.7.0_71]
 484 ! at
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
 [na:1.7.0_71]
 485 ! at
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [na:1.7.0_71]
 486 ! at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 [na:1.7.0_71]
 487 ! at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 [na:1.7.0_71]



Re: Determining number of executors within RDD

2015-06-10 Thread Sandy Ryza
On YARN, there is no concept of a Spark Worker.  Multiple executors will be
run per node without any effort required by the user, as long as all the
executors fit within each node's resource limits.

-Sandy

On Wed, Jun 10, 2015 at 3:24 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Yes  i think it is ONE worker ONE executor as executor is nothing but jvm
 instance spawned by the worker

 To run more executors ie jvm instances on the same physical cluster node
 you need to run more than one worker on that node and then allocate only
 part of the sys resourced to that worker/executot


 Sent from Samsung Mobile


  Original message 
 From: maxdml
 Date:2015/06/10 19:56 (GMT+00:00)
 To: user@spark.apache.org
 Subject: Re: Determining number of executors within RDD

 Actually this is somehow confusing for two reasons:

 - First, the option 'spark.executor.instances', which seems to be only
 dealt
 with in the case of YARN in the source code of SparkSubmit.scala, is also
 present in the conf/spark-env.sh file under the standalone section, which
 would indicate that it is also available for this mode

 - Second, a post from Andrew Or states that this properties define the
 number of workers in the cluster, not the number of executors on a given
 worker.
 (
 http://apache-spark-user-list.1001560.n3.nabble.com/clarification-for-some-spark-on-yarn-configuration-options-td13692.html
 )

 Could anyone clarify this? :-)

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23262.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to run spark streaming application on YARN?

2015-06-04 Thread Sandy Ryza
That might work, but there might also be other steps that are required.

-Sandy

On Thu, Jun 4, 2015 at 11:13 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Thanks! It is working fine now with spark-submit. Just out of curiosity,
 how would you use org.apache.spark.deploy.yarn.Client? Adding that
 spark_yarn jar to the configuration inside the application?

 On Thu, Jun 4, 2015 at 6:37 PM, Vova Shelgunov vvs...@gmail.com wrote:

 You should run it with spark-submit or using org
 .apache.spark.deploy.yarn.Client.

 2015-06-04 20:30 GMT+03:00 Saiph Kappa saiph.ka...@gmail.com:

 No, I am not. I run it with sbt «sbt run-main Branchmark». I thought
 it was the same thing since I am passing all the configurations through the
 application code. Is that the problem?

 On Thu, Jun 4, 2015 at 6:26 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Saiph,

 Are you launching using spark-submit?

 -Sandy

 On Thu, Jun 4, 2015 at 10:20 AM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 I've been running my spark streaming application in standalone mode
 without any worries. Now, I've been trying to run it on YARN (hadoop 
 2.7.0)
 but I am having some problems.

 Here are the config parameters of my application:
 «
 val sparkConf = new SparkConf()

 sparkConf.setMaster(yarn-client)
 sparkConf.set(spark.yarn.am.memory, 2g)
 sparkConf.set(spark.executor.instances, 2)

 sparkConf.setAppName(Benchmark)

 sparkConf.setJars(Array(target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar))
 sparkConf.set(spark.executor.memory, 4g)
 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 sparkConf.set(spark.executor.extraJavaOptions, 
 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC  +
   -XX:+AggressiveOpts -XX:FreqInlineSize=300
 -XX:MaxInlineSize=300 )
 if (sparkConf.getOption(spark.master) == None) {
   sparkConf.setMaster(local[*])
 }
 »

 The jar I'm including there only contains the application classes.


 Here is the log of the application: http://pastebin.com/7RSktezA

 Here is the userlog on hadoop/YARN:
 «
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/Logging
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:596)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 14 more
 »

 I tried to add the spark core jar to ${HADOOP_HOME}/lib but the error
 persists. Am I doing something wrong?

 Thanks.








Re: data localisation in spark

2015-06-03 Thread Sandy Ryza
Tasks are scheduled on executors based on data locality.  Things work as
you would expect in the example you brought up.

Through dynamic allocation, the number of executors can change throughout
the life time of an application.  10 executors (or 5 executors with 2 cores
each) are not needed for a reducebyKey with parallelism = 10.  If there are
fewer slots to run tasks than tasks, the tasks will just be run serially.

-Sandy

On Tue, Jun 2, 2015 at 11:24 AM, Shushant Arora shushantaror...@gmail.com
wrote:

  So in spark is after acquiring executors from ClusterManeger, does tasks
 are scheduled on executors based on datalocality ?I Mean if in an
 application there are 2 jobs and output of 1 job is used as input of
 another job.
 And in job1 I did persist on some RDD, then while running job2 will it use
 the same executor where job1's output was persisted or it acquire executor
 again and data movement happens?

 And is it true no of execuotrs in an application are fixed and acquired at
 start of application  and remains same throught application? If yes, how
 does it takes cares of explicit no of reducers in some of apis say
 rddd.reduceByKey(func,10);

 does at converting DAG to stages it calculates executors required and then
 acquire executors/worker nodes ?


 On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 It is not possible with JavaSparkContext either.  The API mentioned below
 currently does not have any effect (we should document this).

 The primary difference between MR and Spark here is that MR runs each
 task in its own YARN container, while Spark runs multiple tasks within an
 executor, which needs to be requested before Spark knows what tasks it will
 run.  Although dynamic allocation improves that last part.

 -Sandy

 On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that
 tries to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell
 yarn's RM to check HDFS blocks of input data and then allocate executors to
 it.
 And executors remain fixed throughout application or driver program
 asks for new executors when it submits another job in same application ,
 since in spark new job is created for each action . If executors are fixed
 then for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked
 as free in ResourceManager's resoruce queue and  executors directly tell
 this to Rm  instead of via driver's ?

 Thanks
 Shushant







Re: data localisation in spark

2015-06-02 Thread Sandy Ryza
It is not possible with JavaSparkContext either.  The API mentioned below
currently does not have any effect (we should document this).

The primary difference between MR and Spark here is that MR runs each task
in its own YARN container, while Spark runs multiple tasks within an
executor, which needs to be requested before Spark knows what tasks it will
run.  Although dynamic allocation improves that last part.

-Sandy

On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that tries
 to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant





Re: data localisation in spark

2015-05-31 Thread Sandy Ryza
Hi Shushant,

Spark currently makes no effort to request executors based on data locality
(although it does try to schedule tasks within executors based on data
locality).  We're working on adding this capability at SPARK-4352
https://issues.apache.org/jira/browse/SPARK-4352.

-Sandy

On Sun, May 31, 2015 at 7:24 AM, Shushant Arora shushantaror...@gmail.com
wrote:


 I want to understand how  spark takes care of data localisation in cluster
 mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant



Re: yarn-cluster spark-submit process not dying

2015-05-28 Thread Sandy Ryza
Hi Corey,

As of this PR https://github.com/apache/spark/pull/5297/files, this can be
controlled with spark.yarn.submit.waitAppCompletion.

-Sandy

On Thu, May 28, 2015 at 11:48 AM, Corey Nolet cjno...@gmail.com wrote:

 I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
 noticing the jvm that fires up to allocate the resources, etc... is not
 going away after the application master and executors have been allocated.
 Instead, it just sits there printing 1 second status updates to the
 console. If I kill it, my job still runs (as expected).

 Is there an intended way to stop this from happening and just have the
 local JVM die when it's done allocating the resources and deploying the
 application master?



Re: number of executors

2015-05-18 Thread Sandy Ryza
*All

On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe








Re: number of executors

2015-05-18 Thread Sandy Ryza
Hi Xiaohe,

The all Spark options must go before the jar or they won't take effect.

-Sandy

On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems in
 my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe







Re: number of executors

2015-05-18 Thread Sandy Ryza
Awesome!

It's documented here:
https://spark.apache.org/docs/latest/submitting-applications.html

-Sandy

On Mon, May 18, 2015 at 8:03 PM, xiaohe lan zombiexco...@gmail.com wrote:

 Hi Sandy,

 Thanks for your information. Yes, spark-submit --master yarn
 --num-executors 5 --executor-cores 4
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is
 working awesomely. Is there any documentations pointing to this ?

 Thanks,
 Xiaohe

 On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe









Re: Expert advise needed. (POC is at crossroads)

2015-04-30 Thread Sandy Ryza
Hi Deepak,

I wrote a couple posts with a bunch of different information about how to
tune Spark jobs.  The second one might be helpful with how to think about
tuning the number of partitions and resources?  What kind of OOMEs are you
hitting?

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

-Sandy


On Thu, Apr 30, 2015 at 5:03 PM, java8964 java8...@hotmail.com wrote:

 Really not expert here, but try the following ideas:

 1) I assume you are using yarn, then this blog is very good about the
 resource tuning:
 http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

 2) If 12G is a hard limit in this case, then you have no option but lower
 your concurrency. Try starting set --executor-cores=1 as first step, this
 will force each executor running with one task a time. This is worst
 efficient for your job, but try to see if your application can be finished
 without OOM.

 3) Add more partitions for your RDD. For a given RDD, larger partitions
 means each partition will contain less data, which requires less memory to
 process them, and if each one processed by 1 core in each executor, that
 means you almost lower your memory requirement for executor to the lowest
 level.

 4) Do you cache data? Don't cache them for now, and lower 
 spark.storage.memoryFraction, so less memory preserved for cache.

 Since your top priority is to avoid OOM, all the above steps will make the
 job run slower, or less efficient. In any case, first you should check
 your code logic, to see if there could be with any improvement, but we
 assume your code is already optimized, as in your email. If the above steps
 still cannot help your OOM, then maybe your data for one partition just
 cannot fit with 12G heap, based on the logic you try to do in your code.

 Yong

 --
 From: deepuj...@gmail.com
 Date: Thu, 30 Apr 2015 18:48:12 +0530
 Subject: Expert advise needed. (POC is at crossroads)
 To: user@spark.apache.org


 I am at crossroads now and expert advise help me decide what the next
 course of the project going to be.

 Background : At out company we process tons of data to help build
 experimentation platform. We fire more than 300s of M/R jobs, Peta bytes of
 data, takes 24 hours and does lots of joins. Its simply stupendously
 complex.

 POC: Migrate a small portion of processing to Spark and aim to achieve 10x
 gains. Today this processing on M/R world takes 2.5 to 3 Hours.

 Data Sources: 3 (All on HDFS).
 Format: Two in Sequence File and one in Avro
 Data Size:
 1)  64 files  169,380,175,136 bytes- Sequence
 2) 101 files84,957,259,664 bytes- Avro
 3) 744 files   1,972,781,123,924 bytes- Sequence

 Process
 A) Map Side Join of #1 and #2
 B) Left Outer Join of A) and #3
 C) Reduce By Key of B)
 D) Map Only processing of C.

 Optimizations
 1) Converted Equi-Join to Map-Side  (Broadcast variables ) Join #A.
 2) Converted groupBy + Map = ReduceBy Key #C.

 I have a huge YARN (Hadoop 2.4.x) cluster at my disposal but I am limited
 to use only 12G on each node.

 1) My poc (after a month of crazy research, lots of QA on this amazing
 forum) runs fine with 1 file each from above data sets and finishes in 10
 mins taking 4 executors. I started with 60 mins and got it down to 10 mins.
 2) For 5 files each data set it takes 45 mins and 16 executors.
 3) When i run against 10 files, it fails repeatedly with OOM and several
 timeout errors.
 Configs:  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4, Spark 1.3.1


 Expert Advice
 My goal is simple to be able to complete the processing at 10x to 100x
 speed than M/R or show its not possible with Spark.

 *A) 10x to 100x*
 1) What will it take in terms of # of executors, # of executor-cores ? 
 amount of memory on each executor and some unknown magic settings that am
 suppose to do to reach this goal ?
 2) I am attaching the code for review that can further speed up
 processing, if at all its possible ?
 3) Do i need to do something else ?

 *B) Give up and wait for next amazing tech to come up*
 Given the steps that i have performed so far, should i conclude that its
 not possible to achieve 10x to 100x gains and am stuck with M/R world for
 now.

 I am in need of help here. I am available for discussion at any time
 (day/night).

 Hope i provided all the details.
 Regards,
 Deepak


 - To
 unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org



Re: Question about Memory Used and VCores Used

2015-04-29 Thread Sandy Ryza
Hi,

Good question.  The extra memory comes from
spark.yarn.executor.memoryOverhead, the space used for the application
master, and the way the YARN rounds requests up.  This explains it in a
little more detail:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

-Sandy

On Tue, Apr 28, 2015 at 7:12 PM, bit1...@163.com bit1...@163.com wrote:

 Hi,guys,
 I have the following computation with 3 workers:
 spark-sql --master yarn --executor-memory 3g --executor-cores 2
 --driver-memory 1g -e 'select count(*) from table'

 The resources used are shown as below on the UI:
 I don't understand why the memory used is 15GB and vcores used is 5. I
 think the memory used should be executor-memory*numOfWorkers=3G*3=9G, and
 the Vcores used shoulde be executor-cores*numOfWorkers=6

 Can you please explain the result?Thanks.



 --
 bit1...@163.com



Re: Running beyond physical memory limits

2015-04-15 Thread Sandy Ryza
The setting to increase is spark.yarn.executor.memoryOverhead

On Wed, Apr 15, 2015 at 6:35 AM, Brahma Reddy Battula 
brahmareddy.batt...@huawei.com wrote:

 Hello Sean Owen,

 Thanks for your reply..Ill increase overhead memory and check it..


 Bytheway ,Any difference between 1.1 and 1.2 makes, this issue..? Since It
 was passing spark 1.1 and throwing following error in 1.2...( this makes me
 doubt full)



 Thanks  Regards
 Brahma Reddy Battula




 
 From: Sean Owen [so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 2:49 PM
 To: Brahma Reddy Battula
 Cc: Akhil Das; user@spark.apache.org
 Subject: Re: Running beyond physical memory limits

 This is not related to executor memory, but the extra overhead
 subtracted from the executor's size in order to avoid using more than
 the physical memory that YARN allows. That is, if you declare a 32G
 executor YARN lets you use 32G physical memory but your JVM heap must
 be significantly less than 32G max. This is the overhead factor that
 is subtracted for you, and it seems to need to be bigger in your case.

 On Wed, Apr 15, 2015 at 10:16 AM, Brahma Reddy Battula
 brahmareddy.batt...@huawei.com wrote:
  Thanks lot for your reply..
 
There is no issue with spark1.1..Following issue came when I upgrade to
  spark2.0...Hence I did not decrease spark.executor.memory...
  I mean to say, used same config for spark1.1 and spark1.2..
 
  Is there any issue with spark1.2..?
  Or Yarn will lead this..?
  And why executor will not release memory, if there are tasks running..?
 
 
  Thanks  Regards
 
  Brahma Reddy Battula
 
 
  
  From: Akhil Das [ak...@sigmoidanalytics.com]
  Sent: Wednesday, April 15, 2015 2:35 PM
  To: Brahma Reddy Battula
  Cc: user@spark.apache.org
  Subject: Re: Running beyond physical memory limits
 
  Did you try reducing your spark.executor.memory?
 
  Thanks
  Best Regards
 
  On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula
  brahmareddy.batt...@huawei.com wrote:
 
  Hello Sparkers
 
 
  I am newbie to spark and  need help.. We are using spark 1.2, we are
  getting the following error and executor is getting killed..I seen
  SPARK-1930 and it should be in 1.2..
 
  Any pointer to following error, like what might lead this error..
 
 
  2015-04-15 11:55:39,697 | WARN  | Container Monitor | Container
  [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is
  running beyond physical memory limits. Current usage: 26.0 GB of 26 GB
  physical memory used; 26.7 GB of 260 GB virtual memory used. Killing
  container.
  Dump of the process-tree for container_1429065217137_0012_01_-411041790
 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
  SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352
  6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
  -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m
 
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
  -Dspark.driver.port=23204 -Dspark.random.port.max=23999
  -Dspark.akka.threads=32 -Dspark.akka.frameSize=10
 -Dspark.akka.timeout=100
  -Dspark.ui.port=23000 -Dspark.random.port.min=23000
 
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3
  hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960
 126843
  126843 (bash) 0 0 11603968 331 /bin/bash -c
  /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
  -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m
 
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
  '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999'
  '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10'
  '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000'
  '-Dspark.random.port.min=23000'
 
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  

Re: Spark: Using node-local files within functions?

2015-04-14 Thread Sandy Ryza
Hi Tobias,

It should be possible to get an InputStream from an HDFS file.  However, if
your libraries only work directly on files, then maybe that wouldn't work?
If that's the case and different tasks need different files, your way is
probably the best way.  If all tasks need the same file, a better option
would be to pass the file in with the --files option when you spark-submit,
which will cache the file between executors on the same node.

-Sandy

On Tue, Apr 14, 2015 at 1:39 AM, Horsmann, Tobias 
tobias.horsm...@uni-due.de wrote:

  Hi,

  I am trying to use Spark in combination with Yarn with 3rd party code
 which is unaware of distributed file systems. Providing hdfs file
 references thus does not work.

  My idea to resolve this issue was the following:

  Within a function I take the HDFS file reference I get as parameter and
 copy it into the local file system and provide the 3rd party components
 what they expect.
 textFolder.map(new Function()
 {
 public List... call(String inputFile)
 throws Exception
 {
//resolve, copy hdfs file to local file system

//get local file pointer
//this function should be executed on a node, right. There
 is probably a local file system)

//call 3rd party library with 'local file' reference

// do other stuff
 }
 }

 This seem to work, but I am not really sure if this might cause other
 problems when going to productive file sizes. E.g. the files I copy to the
 local file system might be large. Would this affect Yarn somehow? Are there
 more advisable ways to befriend HDFS-unaware libraries with HDFS file
 pointer?

  Regards,




Re: Rack locality

2015-04-13 Thread Sandy Ryza
Hi Riya,

As far as I know, that is correct, unless Mesos fine-grained mode handles
this in some mysterious way.

-Sandy

On Mon, Apr 13, 2015 at 2:09 PM, rcharaya riya.char...@gmail.com wrote:

 I want to use Rack locality feature of Apache Spark in my application.

 Is YARN the only resource manager which supports it as of now?

 After going through the source code, I found that default implementation of
 getRackForHost() method returns NONE in TaskSchedulerImpl which (I suppose)
 would be used by standalone mode.

 On the other hand, it is overriden in YarnScheduler.scala to fetch the rack
 information by invoking RackResolver api of hadoop which would be used when
 its run on YARN.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Rack-locality-tp22483.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Job Run Resource Estimation ?

2015-04-09 Thread Sandy Ryza
Hi Deepak,

I'm going to shamelessly plug my blog post on tuning Spark:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

It talks about tuning executor size as well as how the number of tasks for
a stage is calculated.

-Sandy

On Thu, Apr 9, 2015 at 9:21 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have a spark job that has multiple stages. For now i star it with 100
 executors, each with 12G mem (max is 16G). I am using Spark 1.3 over YARN
 2.4.x.

 For now i start the Spark Job with a very limited input (1 file of size
 2G), overall there are 200 files. My first run is yet to complete as its
 taking too much of time / throwing OOM exceptions / buffer exceptions (keep
 that aside).

 How will i know how much resources are required to run this job. (# of
 cores, executors, mem, serialization buffers, and i do not yet what else).

 IN M/R world, all i do is set split size and rest is taken care
 automatically (yes i need to worry about mem, in case of OOM).


 1) Can someone explain how they do resource estimation before running the
 job or is there no way and one needs to only try it out ?
 2) Even if i give 100 executors, the first stage takes only 5, how did
 spark decide this ?

 Please point me to any resources that also talks about similar things or
 please explain here.

 --
 Deepak




Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs

2015-04-06 Thread Sandy Ryza
What's the advantage of killing an application for lack of resources?

I think the rationale behind killing an app based on executor failures is
that, if we see a lot of them in a short span of time, it means there's
probably something going wrong in the app or on the cluster.

On Wed, Apr 1, 2015 at 7:08 PM, twinkle sachdeva twinkle.sachd...@gmail.com
 wrote:

 Hi,

 Thanks Sandy.


 Another way to look at this is that would we like to have our long running
 application to die?

 So let's say, we create a window of around 10 batches, and we are using
 incremental kind of operations inside our application, as restart here is a
 relatively more costlier, so should it be the maximum number of executor
 failure's kind of criteria to fail the application or should we have some
 parameters around minimum number of executor's availability for some x time?

 So, if the application is not able to have minimum n number of executors
 within x period of time, then we should fail the application.

 Adding time factor here, will allow some window for spark to get more
 executors allocated if some of them fails.

 Thoughts please.

 Thanks,
 Twinkle


 On Wed, Apr 1, 2015 at 10:19 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 That's a good question, Twinkle.

 One solution could be to allow a maximum number of failures within any
 given time span.  E.g. a max failures per hour property.

 -Sandy

 On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva 
 twinkle.sachd...@gmail.com wrote:

 Hi,

 In spark over YARN, there is a property spark.yarn.max.executor.failures
 which controls the maximum number of executor's failure an application will
 survive.

 If number of executor's failures ( due to any reason like OOM or machine
 failure etc ), exceeds this value then applications quits.

 For small duration spark job, this looks fine, but for the long running
 jobs as this does not take into account the duration, this can lead to same
 treatment for two different scenarios ( mentioned below) :
 1. executors failing with in 5 mins.
 2. executors failing sparsely, but at some point even a single executor
 failure ( which application could have survived ) can make the application
 quit.

 Sending it to the community to listen what kind of behaviour / strategy
 they think will be suitable for long running spark jobs or spark streaming
 jobs.

 Thanks and Regards,
 Twinkle






Re: Data locality across jobs

2015-04-02 Thread Sandy Ryza
This isn't currently a capability that Spark has, though it has definitely
been discussed: https://issues.apache.org/jira/browse/SPARK-1061.  The
primary obstacle at this point is that Hadoop's FileInputFormat doesn't
guarantee that each file corresponds to a single split, so the records
corresponding to a particular partition at the end of the first job can end
up split across multiple partitions in the second job.

-Sandy

On Wed, Apr 1, 2015 at 9:09 PM, kjsingh kanwaljit.si...@guavus.com wrote:

 Hi,

 We are running an hourly job using Spark 1.2 on Yarn. It saves an RDD of
 Tuple2. At the end of day, a daily job is launched, which works on the
 outputs of the hourly jobs.

 For data locality and speed, we wish that when the daily job launches, it
 finds all instances of a given key at a single executor rather than
 fetching
 it from others during shuffle.

 Is it possible to maintain key partitioning across jobs? We can control
 partitioning in one job. But how do we send keys to the executors of same
 node manager across jobs? And while saving data to HDFS, are the blocks
 allocated to the same data node machine as the executor for a partition?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-across-jobs-tp22351.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs

2015-04-01 Thread Sandy Ryza
That's a good question, Twinkle.

One solution could be to allow a maximum number of failures within any
given time span.  E.g. a max failures per hour property.

-Sandy

On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva 
twinkle.sachd...@gmail.com wrote:

 Hi,

 In spark over YARN, there is a property spark.yarn.max.executor.failures
 which controls the maximum number of executor's failure an application will
 survive.

 If number of executor's failures ( due to any reason like OOM or machine
 failure etc ), increases this value then applications quits.

 For small duration spark job, this looks fine, but for the long running
 jobs as this does not take into account the duration, this can lead to same
 treatment for two different scenarios ( mentioned below) :
 1. executors failing with in 5 mins.
 2. executors failing sparsely, but at some point even a single executor
 failure ( which application could have survived ) can make the application
 quit.

 Sending it to the community to listen what kind of behaviour / strategy
 they think will be suitable for long running spark jobs or spark streaming
 jobs.

 Thanks and Regards,
 Twinkle



Re: Cross-compatibility of YARN shuffle service

2015-03-26 Thread Sandy Ryza
Hi Matt,

I'm not sure whether we have documented compatibility guidelines here.
However, a strong goal is to keep the external shuffle service compatible
so that many versions of Spark can run against the same shuffle service.

-Sandy

On Wed, Mar 25, 2015 at 6:44 PM, Matt Cheah mch...@palantir.com wrote:

 Hi everyone,

 I am considering moving from Spark-Standalone to YARN. The context is that
 there are multiple Spark applications that are using different versions of
 Spark that all want to use the same YARN cluster.

 My question is: if I use a single Spark YARN shuffle service jar on the
 Node Manager, will the service work properly with all of the Spark
 applications, regardless of the specific versions of the applications? Or,
 is it it the case that, if I want to use the external shuffle service, I
 need to have all of my applications using the same version of Spark?

 Thanks,

 -Matt Cheah



Re: What is best way to run spark job in yarn-cluster mode from java program(servlet container) and NOT using spark-submit command.

2015-03-26 Thread Sandy Ryza
Creating a SparkContext and setting master as yarn-cluster unfortunately
will not work.

SPARK-4924 added APIs for doing this in Spark, but won't be included until
1.4.

-Sandy

On Tue, Mar 17, 2015 at 3:19 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Create SparkContext set master as yarn-cluster then run it as a standalone
 program?

 Thanks
 Best Regards

 On Tue, Mar 17, 2015 at 1:27 AM, rrussell25 rrussel...@gmail.com wrote:

 Hi, were you ever able to determine a satisfactory approach for this
 problem?
 I have a similar situation and would prefer to execute the job directly
 from
 java code within my jms listener and/or servlet container.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-is-best-way-to-run-spark-job-in-yarn-cluster-mode-from-java-program-servlet-container-and-NOT-u-tp21817p22086.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: issue while submitting Spark Job as --master yarn-cluster

2015-03-25 Thread Sandy Ryza
Hi Sachin,

It appears that the application master is failing.  To figure out what's
wrong you need to get the logs for the application master.

-Sandy

On Wed, Mar 25, 2015 at 7:05 AM, Sachin Singh sachin.sha...@gmail.com
wrote:

 OS I am using Linux,
 when I will run simply as master yarn, its running fine,

 Regards
 Sachin

 On Wed, Mar 25, 2015 at 4:25 PM, Xi Shen davidshe...@gmail.com wrote:

 What is your environment? I remember I had similar error when running
 spark-shell --master yarn-client in Windows environment.


 On Wed, Mar 25, 2015 at 9:07 PM sachin Singh sachin.sha...@gmail.com
 wrote:

 Hi ,
 when I am submitting spark job in cluster mode getting error as under in
 hadoop-yarn  log,
 someone has any idea,please suggest,

 2015-03-25 23:35:22,467 INFO
 org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl:
 application_1427124496008_0028 State change from FINAL_SAVING to FAILED
 2015-03-25 23:35:22,467 WARN
 org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=hdfs
 OPERATION=Application Finished - Failed TARGET=RMAppManager
  RESULT=FAILURE
 DESCRIPTION=App failed with state: FAILED   PERMISSIONS=Application
 application_1427124496008_0028 failed 2 times due to AM Container for
 appattempt_1427124496008_0028_02 exited with  exitCode: 13 due to:
 Exception from container-launch.
 Container id: container_1427124496008_0028_02_01
 Exit code: 13
 Stack trace: ExitCodeException exitCode=13:
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(
 Shell.java:702)
 at
 org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.
 launchContainer(DefaultContainerExecutor.java:197)
 at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.
 launcher.ContainerLaunch.call(ContainerLaunch.java:299)
 at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.
 launcher.ContainerLaunch.call(ContainerLaunch.java:81)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)


 Container exited with a non-zero exit code 13
 .Failing this attempt.. Failing the application.
 APPID=application_1427124496008_0028



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/issue-while-submitting-Spark-Job-as-
 master-yarn-cluster-tp0.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to avoid being killed by YARN node manager ?

2015-03-24 Thread Sandy Ryza
Hi Yuichiro,

The way to avoid this is to boost spark.yarn.executor.memoryOverhead until
the executors have enough off-heap memory to avoid going over their limits.

-Sandy

On Tue, Mar 24, 2015 at 11:49 AM, Yuichiro Sakamoto ks...@muc.biglobe.ne.jp
 wrote:

 Hello.

 We use ALS(Collaborative filtering) of Spark MLlib on YARN.
 Spark version is 1.2.0 included CDH 5.3.1.

 1,000,000,000 records(5,000,000 users data and 5,000,000 items data) are
 used for machine learning with ALS.
 These large quantities of data increases virtual memory usage,
 node manager of YARN kills Spark worker process.
 Even though Spark run again after killing process, Spark worker process is
 killed again.
 As a result, the whole Spark processes are terminated.

 # Spark worker process is killed, it seems that virtual memory usage
 increased by
 # 'Shuffle' or 'Disk writing' gets over the threshold of YARN.

 To avoid such a case from occurring, we use the method that
 'yarn.nodemanager.vmem-check-enabled' is false, then exit successfully.
 But it does not seem to have an appropriate way.
 If you know, please let me know about tuning method of Spark.

 The conditions of machines and Spark settings are as follows.
 1)six machines, physical memory is 32GB of each machine.
 2)Spark settings
 - spark.executor.memory=16g
 - spark.closure.serializer=org.apache.spark.serializer.KryoSerializer
 - spark.rdd.compress=true
 - spark.shuffle.memoryFraction=0.4

 Thanks,
 Yuichiro Sakamoto



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-being-killed-by-YARN-node-manager-tp22199.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-24 Thread Sandy Ryza
Ah, yes, I believe this is because only properties prefixed with spark
get passed on.  The purpose of the --conf option is to allow passing
Spark properties to the SparkConf, not to add general key-value pairs to
the JVM system properties.

-Sandy

On Tue, Mar 24, 2015 at 4:25 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sandy,

 Your suggestion does not work when I try it locally:

 When I pass

   --conf key=someValue

 and then try to retrieve it like:

 SparkConf sparkConf = new SparkConf();
 logger.info(* * * key ~~~ {}, sparkConf.get(key));

 I get

   Exception in thread main java.util.NoSuchElementException: key

 And I think that's expected because the key is an arbitrary one, not
 necessarily a Spark configuration element. This is why I was passing it via
 --conf and retrieving System.getProperty(key) (which worked locally and
 in yarn-client mode but not in yarn-cluster mode). I'm surprised why I
 can't use it on the cluster while I can use it while local development and
 testing.

 Kind regards,

 Emre Sevinç
 http://www.bigindustries.be/



 On Mon, Mar 23, 2015 at 6:15 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Emre,

 The --conf property is meant to work with yarn-cluster mode.
 System.getProperty(key) isn't guaranteed, but new SparkConf().get(key)
 should.  Does it not?

 -Sandy

 On Mon, Mar 23, 2015 at 8:39 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 According to Spark Documentation at
 https://spark.apache.org/docs/1.2.1/submitting-applications.html :

   --conf: Arbitrary Spark configuration property in key=value format.
 For values that contain spaces wrap “key=value” in quotes (as shown).

 And indeed, when I use that parameter, in my Spark program I can
 retrieve the value of the key by using:

 System.getProperty(key);

 This works when I test my program locally, and also in yarn-client mode,
 I can log the value of the key and see that it matches what I wrote in the
 command line, but it returns *null* when I submit the very same program in
 *yarn-cluster* mode.

 Why can't I retrieve the value of key given as --conf key=value when I
 submit my Spark application in *yarn-cluster* mode?

 Any ideas and/or workarounds?


 --
 Emre Sevinç
 http://www.bigindustries.be/





 --
 Emre Sevinc



Re: Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04

2015-03-24 Thread Sandy Ryza
Steve, that's correct, but the problem only shows up when different
versions of the YARN jars are included on the classpath.

-Sandy

On Tue, Mar 24, 2015 at 6:29 AM, Steve Loughran ste...@hortonworks.com
wrote:


  On 24 Mar 2015, at 02:10, Marcelo Vanzin van...@cloudera.com wrote:
 
  This happens most probably because the Spark 1.3 you have downloaded
  is built against an older version of the Hadoop libraries than those
  used by CDH, and those libraries cannot parse the container IDs
  generated by CDH.


 This sounds suspiciously like the changes in YARN for HA (the epoch
 number) isn't being parsed by older versions of the YARN client libs. This
 is effectively a regression in the YARN code -its creating container IDs
 that can't be easily parsed by old apps. It may be possible to fix that
 spark-side by having its own parser for the YARN container/app environment
 variable

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is yarn-standalone mode deprecated?

2015-03-24 Thread Sandy Ryza
I checked and apparently it hasn't be released yet.  it will be available
in the upcoming CDH 5.4 release.

-Sandy

On Mon, Mar 23, 2015 at 1:32 PM, Nitin kak nitinkak...@gmail.com wrote:

 I know there was an effort for this, do you know which version of Cloudera
 distribution we could find that?

 On Mon, Mar 23, 2015 at 1:13 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 The former is deprecated.  However, the latter is functionally equivalent
 to it.  Both launch an app in what is now called yarn-cluster mode.

 Oozie now also has a native Spark action, though I'm not familiar on the
 specifics.

 -Sandy

 On Mon, Mar 23, 2015 at 1:01 PM, Nitin kak nitinkak...@gmail.com wrote:

 To be more clear, I am talking about

 SPARK_JAR=SPARK_ASSEMBLY_JAR_FILE ./bin/spark-class 
 org.apache.spark.deploy.yarn.Client \
   --jar YOUR_APP_JAR_FILE \
   --class APP_MAIN_CLASS \
   --args APP_MAIN_ARGUMENTS \
   --num-workers NUMBER_OF_WORKER_MACHINES \
   --master-class ApplicationMaster_CLASS
   --master-memory MEMORY_FOR_MASTER \
   --worker-memory MEMORY_PER_WORKER \
   --worker-cores CORES_PER_WORKER \
   --name application_name \
   --queue queue_name \
   --addJars any_local_files_used_in_SparkContext.addJar \
   --files files_for_distributed_cache \
   --archives archives_for_distributed_cache

 which I thought was the yarn-standalone mode

 vs

 spark-submit

 ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
 --master yarn-cluster \
 --num-executors 3 \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 1 \
 --queue thequeue \
 lib/spark-examples*.jar


 I didnt see example of ./bin/spark-class in 1.2.0 documentation, so am
 wondering if that is deprecated.





 On Mon, Mar 23, 2015 at 12:11 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 The mode is not deprecated, but the name yarn-standalone is now
 deprecated.  It's now referred to as yarn-cluster.

 -Sandy

 On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 nitinkak...@gmail.com
 wrote:

 Is yarn-standalone mode deprecated in Spark now. The reason I am
 asking is
 because while I can find it in 0.9.0
 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html).
 I
 am not able to find it in 1.2.0.

 I am using this mode to run the Spark jobs from Oozie as a java action.
 Removing this mode will prevent me from doing that. Are there any
 other ways
 of running a Spark job from Oozie other than Shell action?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Is yarn-standalone mode deprecated?

2015-03-23 Thread Sandy Ryza
The mode is not deprecated, but the name yarn-standalone is now
deprecated.  It's now referred to as yarn-cluster.

-Sandy

On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 nitinkak...@gmail.com wrote:

 Is yarn-standalone mode deprecated in Spark now. The reason I am asking is
 because while I can find it in 0.9.0
 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I
 am not able to find it in 1.2.0.

 I am using this mode to run the Spark jobs from Oozie as a java action.
 Removing this mode will prevent me from doing that. Are there any other
 ways
 of running a Spark job from Oozie other than Shell action?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is yarn-standalone mode deprecated?

2015-03-23 Thread Sandy Ryza
The former is deprecated.  However, the latter is functionally equivalent
to it.  Both launch an app in what is now called yarn-cluster mode.

Oozie now also has a native Spark action, though I'm not familiar on the
specifics.

-Sandy

On Mon, Mar 23, 2015 at 1:01 PM, Nitin kak nitinkak...@gmail.com wrote:

 To be more clear, I am talking about

 SPARK_JAR=SPARK_ASSEMBLY_JAR_FILE ./bin/spark-class 
 org.apache.spark.deploy.yarn.Client \
   --jar YOUR_APP_JAR_FILE \
   --class APP_MAIN_CLASS \
   --args APP_MAIN_ARGUMENTS \
   --num-workers NUMBER_OF_WORKER_MACHINES \
   --master-class ApplicationMaster_CLASS
   --master-memory MEMORY_FOR_MASTER \
   --worker-memory MEMORY_PER_WORKER \
   --worker-cores CORES_PER_WORKER \
   --name application_name \
   --queue queue_name \
   --addJars any_local_files_used_in_SparkContext.addJar \
   --files files_for_distributed_cache \
   --archives archives_for_distributed_cache

 which I thought was the yarn-standalone mode

 vs

 spark-submit

 ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
 --master yarn-cluster \
 --num-executors 3 \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 1 \
 --queue thequeue \
 lib/spark-examples*.jar


 I didnt see example of ./bin/spark-class in 1.2.0 documentation, so am
 wondering if that is deprecated.





 On Mon, Mar 23, 2015 at 12:11 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 The mode is not deprecated, but the name yarn-standalone is now
 deprecated.  It's now referred to as yarn-cluster.

 -Sandy

 On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 nitinkak...@gmail.com
 wrote:

 Is yarn-standalone mode deprecated in Spark now. The reason I am asking
 is
 because while I can find it in 0.9.0
 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html).
 I
 am not able to find it in 1.2.0.

 I am using this mode to run the Spark jobs from Oozie as a java action.
 Removing this mode will prevent me from doing that. Are there any other
 ways
 of running a Spark job from Oozie other than Shell action?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-23 Thread Sandy Ryza
Hi Emre,

The --conf property is meant to work with yarn-cluster mode.
System.getProperty(key) isn't guaranteed, but new SparkConf().get(key)
should.  Does it not?

-Sandy

On Mon, Mar 23, 2015 at 8:39 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 According to Spark Documentation at
 https://spark.apache.org/docs/1.2.1/submitting-applications.html :

   --conf: Arbitrary Spark configuration property in key=value format. For
 values that contain spaces wrap “key=value” in quotes (as shown).

 And indeed, when I use that parameter, in my Spark program I can retrieve
 the value of the key by using:

 System.getProperty(key);

 This works when I test my program locally, and also in yarn-client mode, I
 can log the value of the key and see that it matches what I wrote in the
 command line, but it returns *null* when I submit the very same program in
 *yarn-cluster* mode.

 Why can't I retrieve the value of key given as --conf key=value when I
 submit my Spark application in *yarn-cluster* mode?

 Any ideas and/or workarounds?


 --
 Emre Sevinç
 http://www.bigindustries.be/




Re: Shuffle Spill Memory and Shuffle Spill Disk

2015-03-23 Thread Sandy Ryza
Hi Bijay,

The Shuffle Spill (Disk) is the total number of bytes written to disk by
records spilled during the shuffle.  The Shuffle Spill (Memory) is the
amount of space the spilled records occupied in memory before they were
spilled.  These differ because the serialized format is more compact, and
the on-disk version can be compressed as well.

-Sandy

On Mon, Mar 23, 2015 at 5:29 PM, Bijay Pathak bijay.pat...@cloudwick.com
wrote:

 Hello,

 I am running  TeraSort https://github.com/ehiggs/spark-terasort on
 100GB of data. The final metrics I am getting on Shuffle Spill are:

 Shuffle Spill(Memory): 122.5 GB
 Shuffle Spill(Disk): 3.4 GB

 What's the difference and relation between these two metrics? Does these
 mean 122.5 GB was spill from memory during the shuffle?

 thank you,
 bijay



Re: No executors allocated on yarn with latest master branch

2015-03-09 Thread Sandy Ryza
You would have needed to configure it by
setting yarn.scheduler.capacity.resource-calculator to something ending in
DominantResourceCalculator.  If you haven't configured it, there's a high
probability that the recently committed
https://issues.apache.org/jira/browse/SPARK-6050 will fix your problem.

On Wed, Feb 25, 2015 at 1:36 AM, Anders Arpteg arp...@spotify.com wrote:

 We're using the capacity scheduler, to the best of my knowledge. Unsure if
 multi resource scheduling is used, but if you know of an easy way to figure
 that out, then let me know.

 Thanks,
 Anders

 On Sat, Feb 21, 2015 at 12:05 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Are you using the capacity scheduler or fifo scheduler without multi
 resource scheduling by any chance?

 On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com
 wrote:

 The nm logs only seems to contain similar to the following. Nothing else
 in the same time range. Any help?

 2015-02-12 20:47:31,245 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_02
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_12
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_22
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_32
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_42
 2015-02-12 21:24:30,515 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: FINISH_APPLICATION sent to absent application
 application_1422406067005_0053

 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 It seems unlikely to me that it would be a 2.2 issue, though not
 entirely impossible.  Are you able to find any of the container logs?  Is
 the NodeManager launching containers and reporting some exit code?

 -Sandy

 On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com
 wrote:

 No, not submitting from windows, from a debian distribution. Had a
 quick look at the rm logs, and it seems some containers are allocated but
 then released again for some reason. Not easy to make sense of the logs,
 but here is a snippet from the logs (from a test in our small test 
 cluster)
 if you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do
 you think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of
 YARN to see if you can trace the error. In the past I have to closely 
 look
 at arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com
 wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at
 org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at
 com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke

Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
That's all correct.

-Sandy

On Fri, Feb 20, 2015 at 1:23 PM, Kelvin Chu 2dot7kel...@gmail.com wrote:

 Hi Sandy,

 I appreciate your clear explanation. Let me try again. It's the best way
 to confirm I understand.

 spark.executor.memory + spark.yarn.executor.memoryOverhead = the memory
 that YARN will create a JVM

 spark.executor.memory = the memory I can actually use in my jvm
 application = part of it (spark.storage.memoryFraction) is reserved for
 caching + part of it (spark.shuffle.memoryFraction) is reserved for
 shuffling + the remaining is for bookkeeping  UDFs

 If I am correct above, then one implication from them is:

 (spark.executor.memory + spark.yarn.executor.memoryOverhead) * number of
 executors per machine should be configured smaller than a single machine
 physical memory

 Right? Again, thanks!

 Kelvin

 On Fri, Feb 20, 2015 at 11:50 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Kelvin,

 spark.executor.memory controls the size of the executor heaps.

 spark.yarn.executor.memoryOverhead is the amount of memory to request
 from YARN beyond the heap size.  This accounts for the fact that JVMs use
 some non-heap memory.

 The Spark heap is divided into spark.storage.memoryFraction (default 0.6)
 and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic
 Spark bookkeeping and anything the user does inside UDFs.

 -Sandy



 On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com
 wrote:

 Hi Sandy,

 I am also doing memory tuning on YARN. Just want to confirm, is it
 correct to say:

 spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory
 I can actually use in my jvm application

 If it is not, what is the correct relationship? Any other variables or
 config parameters in play? Thanks.

 Kelvin

 On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the
 executor

 Given my cluster specs above what would be appropriate parameters to
 pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container
 [pid=23320,containerID=container_1423083596644_0238_01_004160] is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash
 -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976
 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: No executors allocated on yarn with latest master branch

2015-02-20 Thread Sandy Ryza
Are you using the capacity scheduler or fifo scheduler without multi
resource scheduling by any chance?

On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com wrote:

 The nm logs only seems to contain similar to the following. Nothing else
 in the same time range. Any help?

 2015-02-12 20:47:31,245 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_02
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_12
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_22
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_32
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_42
 2015-02-12 21:24:30,515 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: FINISH_APPLICATION sent to absent application
 application_1422406067005_0053

 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 It seems unlikely to me that it would be a 2.2 issue, though not entirely
 impossible.  Are you able to find any of the container logs?  Is the
 NodeManager launching containers and reporting some exit code?

 -Sandy

 On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com
 wrote:

 No, not submitting from windows, from a debian distribution. Had a quick
 look at the rm logs, and it seems some containers are allocated but then
 released again for some reason. Not easy to make sense of the logs, but
 here is a snippet from the logs (from a test in our small test cluster) if
 you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do you
 think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of
 YARN to see if you can trace the error. In the past I have to closely look
 at arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at
 com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able

Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
Are you specifying the executor memory, cores, or number of executors
anywhere?  If not, you won't be taking advantage of the full resources on
the cluster.

-Sandy

On Fri, Feb 20, 2015 at 2:41 AM, Sean Owen so...@cloudera.com wrote:

 None of this really points to the problem. These indicate that workers
 died but not why. I'd first go locate executor logs that reveal more
 about what's happening. It sounds like a hard-er type of failure, like
 JVM crash or running out of file handles, or GC thrashing.

 On Fri, Feb 20, 2015 at 4:51 AM, lbierman leebier...@gmail.com wrote:
  I'm a bit new to Spark, but had a question on performance. I suspect a
 lot of
  my issue is due to tuning and parameters. I have a Hive external table on
  this data and to run queries against it runs in minutes
 
  The Job:
  + 40gb of avro events on HDFS (100 million+ avro events)
  + Read in the files from HDFS and dedupe events by key (mapToPair then a
  reduceByKey)
  + RDD returned and persisted (disk and memory)
  + Then passed to a job that take the RDD and mapToPair of new object data
  and then reduceByKey and foreachpartion do work
 
  The issue:
  When I run this on my environment on Yarn this takes 20+ hours. Running
 on
  yarn we see the first stage runs to do build the RDD deduped, but then
 when
  the next stage starts, things fail and data is lost. This results in
 stage 0
  starting over and over and just dragging it out.
 
  Errors I see in the driver logs:
  ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X:
 remote
  Akka client disassociated
 
  15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
 3.1
  (TID 1335,): FetchFailed(BlockManagerId(3, i, 33958),
 shuffleId=1,
  mapId=162, reduceId=0, message=
  org.apache.spark.shuffle.FetchFailedException: Failed to connect
  toX/X:33958
 
  Also we see this, but I'm suspecting this is because the previous stage
  fails and the next one starts:
  org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
  location for shuffle 1
 
  Cluster:
  5 machines, each 2 core , 8gb machines
 
  Spark-submit command:
   spark-submit --class com.myco.SparkJob \
  --master yarn \
  /tmp/sparkjob.jar \
 
  Any thoughts or where to look or how to start approaching this problem or
  more data points to present.
 
  Thanks..
 
  Code for the job:
   JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
  context.newAPIHadoopRDD(
  context.hadoopConfiguration(),
  AvroKeyInputFormat.class,
  AvroKey.class,
  NullWritable.class
  ).keys())
  .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
  .filter(key - { return
  Optional.ofNullable(key.getStepEventKey()).isPresent(); })
  .mapToPair(event - new Tuple2AnalyticsEvent, Integer(event,
 1))
  .reduceByKey((analyticsEvent1, analyticsEvent2) -
 analyticsEvent1)
  .map(tuple - tuple._1());
 
  events.persist(StorageLevel.MEMORY_AND_DISK_2());
  events.mapToPair(event - {
  return new Tuple2T, RunningAggregates(
  keySelector.select(event),
  new RunningAggregates(
  Optional.ofNullable(event.getVisitors()).orElse(0L),
 
  Optional.ofNullable(event.getImpressions()).orElse(0L),
  Optional.ofNullable(event.getAmount()).orElse(0.0D),
 
  Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D)));
  })
  .reduceByKey((left, right) - { return left.add(right); })
  .foreachpartition(dostuff)
 
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
If that's the error you're hitting, the fix is to boost
spark.yarn.executor.memoryOverhead, which will put some extra room in
between the executor heap sizes and the amount of memory requested for them
from YARN.

-Sandy

On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the executor

 Given my cluster specs above what would be appropriate parameters to pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
Hi Kelvin,

spark.executor.memory controls the size of the executor heaps.

spark.yarn.executor.memoryOverhead is the amount of memory to request from
YARN beyond the heap size.  This accounts for the fact that JVMs use some
non-heap memory.

The Spark heap is divided into spark.storage.memoryFraction (default 0.6)
and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic
Spark bookkeeping and anything the user does inside UDFs.

-Sandy



On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com wrote:

 Hi Sandy,

 I am also doing memory tuning on YARN. Just want to confirm, is it correct
 to say:

 spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory I
 can actually use in my jvm application

 If it is not, what is the correct relationship? Any other variables or
 config parameters in play? Thanks.

 Kelvin

 On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the executor

 Given my cluster specs above what would be appropriate parameters to pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=23320,containerID=container_1423083596644_0238_01_004160]
 is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: build spark for cdh5

2015-02-18 Thread Sandy Ryza
Hi Koert,

You should be using -Phadoop-2.3 instead of -Phadoop2.3.

-Sandy

On Wed, Feb 18, 2015 at 10:51 AM, Koert Kuipers ko...@tresata.com wrote:

 does anyone have the right maven invocation for cdh5 with yarn?
 i tried:
 $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn -DskipTests clean
 package
 $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn test

 it builds and passes tests just fine, but when i deploy on cluster and i
 try to run SparkPi i get:
 Caused by: java.lang.VerifyError: class
 org.apache.hadoop.yarn.proto.YarnServiceProtos$GetApplicationReportRequestProto
 overrides final method getUnknownFields.()Lcom/google/p\
 rotobuf/UnknownFieldSet;

 so clearly i am doing something wrong. something with protobuf 2.4 versus
 2.5

 i do not want to use the cloudera version of spark for cdh 5 (it includes
 the wrong akka version for me)

 thanks



Re: Why can't Spark find the classes in this Jar?

2015-02-12 Thread Sandy Ryza
What version of Java are you using?  Core NLP dropped support for Java 7 in
its 3.5.0 release.

Also, the correct command line option is --jars, not --addJars.

On Thu, Feb 12, 2015 at 12:03 PM, Deborah Siegel deborah.sie...@gmail.com
wrote:

 Hi Abe,
 I'm new to Spark as well, so someone else could answer better. A few
 thoughts which may or may not be the right line of thinking..

 1) Spark properties can be set on the SparkConf, and with flags in
 spark-submit, but settings on SparkConf take precedence. I think your jars
 flag for spark-submit may be redundant.

 1) Is there a chance that stanford-corenlp-3.5.0.jar relies on other
 dependencies? I could be wrong, but perhaps if there is no other reason not
 to, try building your application as an uber-jar with a build tool like
 Maven, which will package the whole transitive jar. You can find
 stanford-corenlp on maven central .. I think you would add the below
 dependencies to your pom.xml. After building simple-project-1.0.jar with
 these dependencies, you would not set jars on the sc or jar flags on
 spark-submit.

 dependencies
 dependency
 groupIdedu.stanford.nlp/groupId
 artifactIdstanford-corenlp/artifactId
 version3.5.0/version
 /dependency
 dependency
 groupIdedu.stanford.nlp/groupId
 artifactIdstanford-corenlp/artifactId
 version3.5.0/version
 classifiermodels/classifier
 /dependency
 /dependencies

 HTH.
 Deb

 On Tue, Feb 10, 2015 at 1:12 PM, Abe Handler akh2...@gmail.com wrote:

 I am new to spark. I am trying to compile and run a spark application that
 requires classes from an (external) jar file on my local machine. If I
 open
 the jar (on ~/Desktop) I can see the missing class in the local jar but
 when
 I run spark I get

 NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier

 I add the jar to the spark context like this

 String[] jars = {/home/pathto/Desktop/stanford-corenlp-3.5.0.jar};
 SparkConf conf = new SparkConf().setAppName(Simple
 Application).setJars(jars);
 Then I try to run a submit script like this

 /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \
   --class SimpleApp \
   --master local[4] \
   target/simple-project-1.0.jar \
   --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
 and hit the NoClassDefFoundError.

 I get that this means that the worker threads can't find the class from
 the
 jar. But I am not sure what I am doing wrong. I have tried different
 syntaxes for the last line (below) but none works.

   --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar
   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar

 How can I fix this error?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Sandy Ryza
It seems unlikely to me that it would be a 2.2 issue, though not entirely
impossible.  Are you able to find any of the container logs?  Is the
NodeManager launching containers and reporting some exit code?

-Sandy

On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote:

 No, not submitting from windows, from a debian distribution. Had a quick
 look at the rm logs, and it seems some containers are allocated but then
 released again for some reason. Not easy to make sense of the logs, but
 here is a snippet from the logs (from a test in our small test cluster) if
 you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do you
 think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of YARN
 to see if you can trace the error. In the past I have to closely look at
 arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so 
 something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the executors page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders







Re: No executors allocated on yarn with latest master branch

2015-02-11 Thread Sandy Ryza
Hi Anders,

I just tried this out and was able to successfully acquire executors.  Any
strange log messages or additional color you can provide on your setup?
Does yarn-client mode work?

-Sandy

On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2
 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining the
 executors page in the web ui, only the driver is listed, no executors
 are ever received, and the driver keep waiting forever. Has anyone seemed
 similar problems?

 Thanks for any insights,
 Anders



feeding DataFrames into predictive algorithms

2015-02-11 Thread Sandy Ryza
Hey All,

I've been playing around with the new DataFrame and ML pipelines APIs and
am having trouble accomplishing what seems like should be a fairly basic
task.

I have a DataFrame where each column is a Double.  I'd like to turn this
into a DataFrame with a features column and a label column that I can feed
into a regression.

So far all the paths I've gone down have led me to internal APIs or
convoluted casting in and out of RDD[Row] and DataFrame.  Is there a simple
way of accomplishing this?

any assistance (lookin' at you Xiangrui) much appreciated,
Sandy


Re: Resource allocation in yarn-cluster mode

2015-02-10 Thread Sandy Ryza
Hi Zsolt,

spark.executor.memory, spark.executor.cores, and spark.executor.instances
are only honored when launching through spark-submit.  Marcelo is working
on a Spark launcher (SPARK-4924) that will enable using these
programmatically.

That's correct that the error comes up when
yarn.scheduler.maximum-allocation-mb is exceeded.  The reason it doesn't
just use a smaller amount of memory is because it could be surprising to
the user to find out they're silently getting less memory than they
requested.  Also, I don't think YARN exposes this up front so Spark has no
way to check.

-Sandy

On Tue, Feb 10, 2015 at 8:38 AM, Zsolt Tóth toth.zsolt@gmail.com
wrote:

 One more question: Is there reason why Spark throws an error when
 requesting too much memory instead of capping it to the maximum value (as
 YARN would do by default)?

 Thanks!

 2015-02-10 17:32 GMT+01:00 Zsolt Tóth toth.zsolt@gmail.com:

 Hi,

 I'm using Spark in yarn-cluster mode and submit the jobs programmatically
 from the client in Java. I ran into a few issues when tried to set the
 resource allocation properties.

 1. It looks like setting spark.executor.memory, spark.executor.cores and
 spark.executor.instances have no effect because ClientArguments checks only
 for the command line arguments (--num-executors, --executors cores, etc.).
 Is it possible to use the properties in yarn-cluster mode instead of the
 command line arguments?

 2. My nodes have 5GB memory but when I set --executor-memory to 4g
 (overhead 384m), I get the exception that the required executor memory is
 above the max threshold of this cluster. It looks like this threshold is
 the value of the yarn.scheduler.maximum-allocation-mb property. Is that
 correct?

 Thanks,
 Zsolt





Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Sandy Ryza
 , -Djava.io.tmpdir=$PWD/tmp,
  -Dlog4j.configuration=log4j-spark-container.properties,
 org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
 sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 2,
 phd40010022.na.com, 1, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr)
 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
 phd40010022.na.com:8041
 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
 phd40010024.na.com:8041
 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
 phd40010002.na.com:8041
 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
 Actor[akka.tcp://
 sparkexecu...@phd40010022.na.com:29369/user/Executor#43651774] with ID 2
 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
 Actor[akka.tcp://
 sparkexecu...@phd40010024.na.com:12969/user/Executor#1711844295] with ID 3
 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
 phd40010022.na.com:14119 with 1178.1 MB RAM
 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
 phd40010024.na.com:53284 with 1178.1 MB RAM
 15/02/10 12:06:29 INFO CoarseGrainedSchedulerBackend: Registered executor:
 Actor[akka.tcp://
 sparkexecu...@phd40010002.na.com:35547/user/Executor#-1690254909] with ID
 1
 15/02/10 12:06:29 INFO BlockManagerInfo: Registering block manager
 phd40010002.na.com:62754 with 1178.1 MB RAM
 15/02/10 12:06:36 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:06:51 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:07:06 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:07:21 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:07:36 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:07:51 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:08:06 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:08:21 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:08:36 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:08:51 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:09:06 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:09:21 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:09:36 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:09:51 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:10:06 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:10:21 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:10:36 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 15/02/10 12:10:51 WARN YarnClusterScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory

 On Fri, Feb 6, 2015 at 3:24 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 You can call collect() to pull in the contents of an RDD into the driver:

   val badIPsLines = badIPs.collect()

 On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg jonrgr...@gmail.com wrote:

 OK I tried that, but how do I convert an RDD to a Set that I can then
 broadcast

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Sandy Ryza
You should be able to replace that second line with

val sc = ssc.sparkContext

On Tue, Feb 10, 2015 at 10:04 AM, Jon Gregg jonrgr...@gmail.com wrote:

 They're separate in my code, how can I combine them?  Here's what I have:

   val sparkConf = new SparkConf()
   val ssc =  new StreamingContext(sparkConf, Seconds(bucketSecs))

   val sc = new SparkContext()

 On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Is the SparkContext you're using the same one that the StreamingContext
 wraps?  If not, I don't think using two is supported.

 -Sandy

 On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote:

 I'm still getting an error.  Here's my code, which works successfully
 when tested using spark-shell:

   val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
   val badIpSet = badIPs.toSet
   val badIPsBC = sc.broadcast(badIpSet)


 The job looks OK from my end:

 15/02/07 18:59:58 INFO Client: Application report from ASM:

  application identifier: application_1423081782629_3861

  appId: 3861

 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*

  appDiagnostics:

  appMasterHost: phd40010008.na.com

  appQueue: root.default

  appMasterRpcPort: 0

  appStartTime: 1423353581140

 * yarnAppState: RUNNING*

  distributedFinalState: UNDEFINED


 But the streaming process never actually begins.  The full log is below,
 scroll to the end for the repeated warning WARN YarnClusterScheduler:
 Initial job has not accepted any resources; check your cluster UI to ensure
 that workers are registered and have sufficient memory.

 I'll note that I have a different Spark Streaming app called dqd
 working successfully for a different job that uses only a StreamingContext
 and not an additional SparkContext.  But this app (called sbStreamingTv)
 uses both a SparkContext and a StreamingContext for grabbing a lookup file
 in HDFS for IP filtering. * The references to line #198 from the log
 below refers to the val badIPs =
 sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it
 looks like Spark doesn't get beyond that point in the code.*

 Also, this job (sbStreamingTv) does work successfully using
 yarn-client, even with both a SparkContext and StreamingContext.  It looks
 to me that in yarn-cluster mode it's grabbing resources for the
 StreamingContext but not for the SparkContext.

 Any ideas?

 Jon


 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
 1177.8 MB.
 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129
 with id = ConnectionManagerId(phd40010008.na.com,30129)
 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register
 BlockManager
 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
 phd40010008.na.com:30129 with 1177.8 MB RAM
 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
 http://10.229.16.108:35183
 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
 /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
 http://phd40010008.na.com:25869
 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
 /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook
 for context org.apache.spark.SparkContext@7f38095d
 15/02/10 12:06:17 INFO ApplicationMaster: Registering the
 ApplicationMaster
 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
 containers, each with 2432 memory
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:20 INFO YarnClusterScheduler:
 YarnClusterScheduler.postStartHook done
 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
 will be overridden by the value set by the cluster manager (via
 SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
 15/02/10 12:06:20 INFO SecurityManager: SecurityManager

Re: Open file limit settings for Spark on Yarn job

2015-02-10 Thread Sandy Ryza
Hi Arun,

The limit for the YARN user on the cluster nodes should be all that
matters.  What version of Spark are you using?  If you can turn on
sort-based shuffle it should solve this problem.

-Sandy

On Tue, Feb 10, 2015 at 1:16 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Hi,

 I'm running Spark on Yarn from an edge node, and the tasks on the run Data
 Nodes. My job fails with the Too many open files error once it gets to
 groupByKey(). Alternatively I can make it fail immediately if I repartition
 the data when I create the RDD.

 Where do I need to make sure that ulimit -n is high enough?

 On the edge node it is small, 1024, but on the data nodes, the yarn user
 has a high limit, 32k. But is the yarn user the relevant user? And, is the
 1024 limit for myself on the edge node a problem or is that limit not
 relevant?

 Arun



Re: getting error when submit spark with master as yarn

2015-02-07 Thread Sandy Ryza
Hi Sachin,

In your YARN configuration, either yarn.nodemanager.resource.memory-mb is
1024 on your nodes or yarn.scheduler.maximum-allocation-mb is set to 1024.
If you have more than 1024 MB on each node, you should bump these
properties.  Otherwise, you should request fewer resources by setting
--executor-memory and --driver-memory when you launch your Spark job.

-Sandy

On Sat, Feb 7, 2015 at 10:04 AM, sachin Singh sachin.sha...@gmail.com
wrote:

 Hi,
 when I am trying to execute my program as
 spark-submit --master yarn --class com.mytestpack.analysis.SparkTest
 sparktest-1.jar

 I am getting error bellow error-
 java.lang.IllegalArgumentException: Required executor memory (1024+384 MB)
 is above the max threshold (1024 MB) of this cluster!
 at

 org.apache.spark.deploy.yarn.ClientBase$class.verifyClusterResources(ClientBase.scala:71)
 at
 org.apache.spark.deploy.yarn.Client.verifyClusterResources(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:77)
 at

 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
 at

 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
 at org.apache.spark.SparkContext.init(SparkContext.scala:335)
 at

 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)

 I am new in Hadoop environment,
 Please help how/where need to set memory or any configuration ,thanks in
 advance,




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/getting-error-when-submit-spark-with-master-as-yarn-tp21542.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark impersonation

2015-02-07 Thread Sandy Ryza
https://issues.apache.org/jira/browse/SPARK-5493 currently tracks this.

-Sandy

On Mon, Feb 2, 2015 at 9:37 PM, Zhan Zhang zzh...@hortonworks.com wrote:

  I think you can configure hadoop/hive to do impersonation.  There is no
 difference between secure or insecure hadoop cluster by using kinit.

  Thanks.

  Zhan Zhang

  On Feb 2, 2015, at 9:32 PM, Koert Kuipers ko...@tresata.com wrote:

  yes jobs run as the user that launched them.
 if you want to run jobs on a secure cluster then use yarn. hadoop
 standalone does not support secure hadoop.

 On Mon, Feb 2, 2015 at 5:37 PM, Jim Green openkbi...@gmail.com wrote:

 Hi Team,

  Does spark support impersonation?
 For example, when spark on yarn/hive/hbase/etc..., which user is used by
 default?
 The user which starts the spark job?
 Any suggestions related to impersonation?

  --
  Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)






Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-06 Thread Sandy Ryza
You can call collect() to pull in the contents of an RDD into the driver:

  val badIPsLines = badIPs.collect()

On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg jonrgr...@gmail.com wrote:

 OK I tried that, but how do I convert an RDD to a Set that I can then
 broadcast and cache?

   val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)

 produces the error value getLines is not a member of
 org.apache.spark.rdd.RDD[String].

 Leaving it as an RDD and then constantly joining I think will be too slow
 for a streaming job.

 On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Jon,

 You'll need to put the file on HDFS (or whatever distributed filesystem
 you're running on) and load it from there.

 -Sandy

 On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote:

 I have a file badFullIPs.csv of bad IP addresses used for filtering.
 In
 yarn-client mode, I simply read it off the edge node, transform it, and
 then
 broadcast it:

   val badIPs = fromFile(edgeDir + badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)
   badIPs.close

 How can I accomplish this in yarn-cluster mode?

 Jon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Problems with GC and time to execute with different number of executors.

2015-02-06 Thread Sandy Ryza
That's definitely surprising to me that you would be hitting a lot of GC
for this scenario.  Are you setting --executor-cores and
--executor-memory?  What are you setting them to?

-Sandy

On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 Any idea why if I use more containers I get a lot of stopped because GC?

 2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com:
  I'm not caching the data. with each iteration I mean,, each 128mb
  that a executor has to process.
 
  The code is pretty simple.
 
  final Conversor c = new Conversor(null, null, null,
 longFields,typeFields);
  SparkConf conf = new SparkConf().setAppName(Simple Application);
  JavaSparkContext sc = new JavaSparkContext(conf);
  JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock());
 
   JavaRDDString rddString = rdd.map(new Functionbyte[], String() {
   @Override
public String call(byte[] arg0) throws Exception {
   String result = c.parse(arg0).toString();
return result;
  }
   });
  rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+
 /);
 
  The parse function just takes an array of bytes and applies some
  transformations like,,,
  [0..3] an integer, [4...20] an String, [21..27] another String and so on.
 
  It's just a test code, I'd like to understand what it's happeing.
 
  2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
  Hi Guillermo,
 
  What exactly do you mean by each iteration?  Are you caching data in
  memory?
 
  -Sandy
 
  On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:
 
  I execute a job in Spark where I'm processing a file of 80Gb in HDFS.
  I have 5 slaves:
  (32cores /256Gb / 7physical disks) x 5
 
  I have been trying many different configurations with YARN.
  yarn.nodemanager.resource.memory-mb 196Gb
  yarn.nodemanager.resource.cpu-vcores 24
 
  I have tried to execute the job with different number of executors a
  memory (1-4g)
  With 20 executors takes 25s each iteration (128mb) and it never has a
  really long time waiting because GC.
 
  When I execute around 60 executors the process time it's about 45s and
  some tasks take until one minute because GC.
 
  I have no idea why it's calling GC when I execute more executors
  simultaneously.
  The another question it's why it takes more time to execute each
  block. My theory about the this it's because there're only 7 physical
  disks and it's not the same 5 processes writing than 20.
 
  The code is pretty simple, it's just a map function which parse a line
  and write the output in HDFS. There're a lot of substrings inside of
  the function what it could cause GC.
 
  Any theory about?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: Problems with GC and time to execute with different number of executors.

2015-02-06 Thread Sandy Ryza
Yes, having many more cores than disks and all writing at the same time can
definitely cause performance issues.  Though that wouldn't explain the high
GC.  What percent of task time does the web UI report that tasks are
spending in GC?

On Fri, Feb 6, 2015 at 12:56 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 Yes, It's surpressing to me as well

 I tried to execute it with different configurations,

 sudo -u hdfs spark-submit  --master yarn-client --class
 com.mycompany.app.App --num-executors 40 --executor-memory 4g
 Example-1.0-SNAPSHOT.jar hdfs://ip:8020/tmp/sparkTest/ file22.bin
 parameters

 This is what I executed with different values in num-executors and
 executor-memory.
 What do you think there are too many executors for those HDDs? Could
 it be the reason because of each executor takes more time?

 2015-02-06 9:36 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
  That's definitely surprising to me that you would be hitting a lot of GC
 for
  this scenario.  Are you setting --executor-cores and --executor-memory?
  What are you setting them to?
 
  -Sandy
 
  On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:
 
  Any idea why if I use more containers I get a lot of stopped because GC?
 
  2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com:
   I'm not caching the data. with each iteration I mean,, each 128mb
   that a executor has to process.
  
   The code is pretty simple.
  
   final Conversor c = new Conversor(null, null, null,
   longFields,typeFields);
   SparkConf conf = new SparkConf().setAppName(Simple Application);
   JavaSparkContext sc = new JavaSparkContext(conf);
   JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock());
  
JavaRDDString rddString = rdd.map(new Functionbyte[], String() {
@Override
 public String call(byte[] arg0) throws Exception {
String result = c.parse(arg0).toString();
 return result;
   }
});
   rddString.saveAsTextFile(url + /output/ +
 System.currentTimeMillis()+
   /);
  
   The parse function just takes an array of bytes and applies some
   transformations like,,,
   [0..3] an integer, [4...20] an String, [21..27] another String and so
   on.
  
   It's just a test code, I'd like to understand what it's happeing.
  
   2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
   Hi Guillermo,
  
   What exactly do you mean by each iteration?  Are you caching data
 in
   memory?
  
   -Sandy
  
   On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz 
 konstt2...@gmail.com
   wrote:
  
   I execute a job in Spark where I'm processing a file of 80Gb in
 HDFS.
   I have 5 slaves:
   (32cores /256Gb / 7physical disks) x 5
  
   I have been trying many different configurations with YARN.
   yarn.nodemanager.resource.memory-mb 196Gb
   yarn.nodemanager.resource.cpu-vcores 24
  
   I have tried to execute the job with different number of executors a
   memory (1-4g)
   With 20 executors takes 25s each iteration (128mb) and it never has
 a
   really long time waiting because GC.
  
   When I execute around 60 executors the process time it's about 45s
 and
   some tasks take until one minute because GC.
  
   I have no idea why it's calling GC when I execute more executors
   simultaneously.
   The another question it's why it takes more time to execute each
   block. My theory about the this it's because there're only 7
 physical
   disks and it's not the same 5 processes writing than 20.
  
   The code is pretty simple, it's just a map function which parse a
 line
   and write the output in HDFS. There're a lot of substrings inside of
   the function what it could cause GC.
  
   Any theory about?
  
  
 -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
  
 
 



Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-05 Thread Sandy Ryza
Hi Jon,

You'll need to put the file on HDFS (or whatever distributed filesystem
you're running on) and load it from there.

-Sandy

On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote:

 I have a file badFullIPs.csv of bad IP addresses used for filtering.  In
 yarn-client mode, I simply read it off the edge node, transform it, and
 then
 broadcast it:

   val badIPs = fromFile(edgeDir + badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)
   badIPs.close

 How can I accomplish this in yarn-cluster mode?

 Jon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-04 Thread Sandy Ryza
Also, do you see any lines in the YARN NodeManager logs where it says that
it's killing a container?

-Sandy

On Wed, Feb 4, 2015 at 8:56 AM, Imran Rashid iras...@cloudera.com wrote:

 Hi Michael,

 judging from the logs, it seems that those tasks are just working a really
 long time.  If you have long running tasks, then you wouldn't expect the
 driver to output anything while those tasks are working.

 What is unusual is that there is no activity during all that time the
 tasks are executing.  Are you sure you are looking at the activity of the
 executors (the nodes that are actually running the tasks), and not the
 activity of the driver node (the node where your main program lives, but
 that doesn't do any of the distributed computation)?  It would be perfectly
 normal for the driver node to be idle while all the executors were busy
 with long running tasks.

 I would look at:
 (a) the cpu usage etc. of the executor nodes during those long running
 tasks
 (b) the thread dumps of the executors during those long running tasks
 (available via the UI under the Executors tab, or just log into the boxes
 and run jstack).  Ideally this will point out a hotspot in your code that
 is making these tasks take so long.  (Or perhaps it'll point out what is
 going on in spark internals that is so slow)
 (c) the summary metrics for the long running stage, when it finally
 finishes (also available in the UI, under the Stages tab).  You will get
 a breakdown of how much time is spent in various phases of the tasks, how
 much data is read, etc., which can help you figure out why tasks are slow


 Hopefully this will help you find out what is taking so long.  If you find
 out the executors really arent' doing anything during these really long
 tasks, it would be great to find that out, and maybe get some more info for
 a bug report.

 Imran


 On Tue, Feb 3, 2015 at 6:18 PM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 First, my sincere thanks to all who have given me advice.
 Following previous discussion, I've rearranged my code to try to keep the
 partitions to more manageable sizes.
 Thanks to all who commented.

 At the moment, the input set I'm trying to work with is about 90GB (avro
 parquet format).

 When I run on a reasonable chunk of the data (say half) things work
 reasonably.

 On the full data, the spark process stalls.
 That is, for about 1.5 hours out of a 3.5 hour run, I see no activity.
 No cpu usage, no error message, no network activity.
 It just seems to sits there.
 The messages bracketing the stall are shown below.

 Any advice on how to diagnose this?
 I don't get any error messages.
 The spark UI says that it is running a stage, but it makes no discernible
 progress.
 Ganglia shows no CPU usage or network activity.
 When I shell into the worker nodes there are no filled disks or other
 obvious problems.

 How can I discern what Spark is waiting for?

 The only weird thing seen, other than the stall, is that the yarn logs on
 the workers have lines with messages like this:
 2015-02-03 22:59:58,890 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 13158 for container-id
 container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory
 used; 7.6 GB of 42.5 GB virtual memory used

 It's rather strange that it mentions 42.5 GB of virtual memory.  The
 machines are EMR machines with 32 GB of physical memory and, as far as I
 can determine, no swap space.

 The messages bracketing the stall are shown below.


 Any advice is welcome.

 Thanks!

 Sincerely,
  Mike Albert

 Before the stall.
 15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed
 TaskSet 5.0, whose tasks have all completed, from pool
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5
 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable
 stages
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6,
 Stage 7, Stage 8)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set()
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 6: List(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 7: List(Stage 6)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 8: List(Stage 7)
 At this point, I see no activity for 1.5 hours except for this (XXX for
 I.P. address)
 15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor:
 akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor

 Then finally it started again:
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in
 stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4)
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in
 

Re: running 2 spark applications in parallel on yarn

2015-02-01 Thread Sandy Ryza
Hi Tomer,

Are you able to look in your NodeManager logs to see if the NodeManagers
are killing any executors for exceeding memory limits?  If you observe
this, you can solve the problem by bumping up
spark.yarn.executor.memoryOverhead.

-Sandy

On Sun, Feb 1, 2015 at 5:28 AM, Tomer Benyamini tomer@gmail.com wrote:

 Hi all,

 I'm running spark 1.2.0 on a 20-node Yarn emr cluster. I've noticed that
 whenever I'm running a heavy computation job in parallel to other jobs
 running, I'm getting these kind of exceptions:

 * [task-result-getter-2] INFO  org.apache.spark.scheduler.TaskSetManager-
 Lost task 820.0 in stage 175.0 (TID 11327) on executor xxx:
 java.io.IOException (Failed to connect to xx:35194) [duplicate 12]

 * org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 12

 * org.apache.spark.shuffle.FetchFailedException: Failed to connect to
 x:35194
 Caused by: java.io.IOException: Failed to connect
 to x:35194

 when running the heavy job alone on the cluster, I'm not getting any
 errors. My guess is that spark contexts from different apps do not share
 information about taken ports, and therefore collide on specific ports,
 causing the job/stage to fail. Is there a way to assign a specific set of
 executors to a specific spark job via spark-submit, or is there a way to
 define a range of ports to be used by the application?

 Thanks!
 Tomer





Re: HW imbalance

2015-01-30 Thread Sandy Ryza
Yup, if you turn off YARN's CPU scheduling then you can run executors to
take advantage of the extra memory on the larger boxes. But then some of
the nodes will end up severely oversubscribed from a CPU perspective, so I
would definitely recommend against that.



On Fri, Jan 30, 2015 at 3:31 AM, Michael Segel msegel_had...@hotmail.com
wrote:

 Sorry, but I think there’s a disconnect.

 When you launch a job under YARN on any of the hadoop clusters, the number
 of mappers/reducers is not set and is dependent on the amount of available
 resources.
 So under Ambari, CM, or MapR’s Admin, you should be able to specify the
 amount of resources available on any node which is to be allocated to
 YARN’s RM.
 So if your node has 32GB allocated, you can run N jobs concurrently based
 on the amount of resources you request when you submit your application.

 If you have 64GB allocated, you can run up to 2N jobs concurrently based
 on the same memory constraints.

 In terms of job scheduling, where and when a job can run is going to be
 based on available resources.  So if you want to run a job that needs 16GB
 of resources, and all of your nodes are busy and only have 4GB per node
 available to YARN, your 16GB job will wait until there is at least that
 much resources available.

 To your point, if you say you need 4GB per task, then it must be the same
 per task for that job. The larger the cluster node, in this case memory,
 the more jobs you can run.

 This is of course assuming you could over subscribe a node in terms of cpu
 cores if you have memory available.

 YMMV

 HTH
 -Mike

 On Jan 30, 2015, at 7:10 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 My answer was based off the specs that Antony mentioned: different amounts
 of memory, but 10 cores on all the boxes.  In that case, a single Spark
 application's homogeneously sized executors won't be able to take advantage
 of the extra memory on the bigger boxes.

 Cloudera Manager can certainly configure YARN with different resource
 profiles for different nodes if that's what you're wondering.

 -Sandy

 On Thu, Jan 29, 2015 at 11:03 PM, Michael Segel msegel_had...@hotmail.com
  wrote:

 @Sandy,

 There are two issues.
 The spark context (executor) and then the cluster under YARN.

 If you have a box where each yarn job needs 3GB,  and your machine has
 36GB dedicated as a YARN resource, you can run 12 executors on the single
 node.
 If you have a box that has 72GB dedicated to YARN, you can run up to 24
 contexts (executors) in parallel.

 Assuming that you’re not running any other jobs.

 The larger issue is if your version of Hadoop will easily let you run
 with multiple profiles or not. Ambari (1.6 and early does not.) Its
 supposed to be fixed in 1.7 but I haven’t evaluated it yet.
 Cloudera? YMMV

 If I understood the question raised by the OP, its more about a
 heterogeneous cluster than spark.

 -Mike

 On Jan 26, 2015, at 5:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Antony,

 Unfortunately, all executors for any single Spark application must have
 the same amount of memory.  It's possibly to configure YARN with different
 amounts of memory for each host (using
 yarn.nodemanager.resource.memory-mb), so other apps might be able to take
 advantage of the extra memory.

 -Sandy

 On Mon, Jan 26, 2015 at 8:34 AM, Michael Segel msegel_had...@hotmail.com
  wrote:

 If you’re running YARN, then you should be able to mix and max where
 YARN is managing the resources available on the node.

 Having said that… it depends on which version of Hadoop/YARN.

 If you’re running Hortonworks and Ambari, then setting up multiple
 profiles may not be straight forward. (I haven’t seen the latest version of
 Ambari)

 So in theory, one profile would be for your smaller 36GB of ram, then
 one profile for your 128GB sized machines.
 Then as your request resources for your spark job, it should schedule
 the jobs based on the cluster’s available resources.
 (At least in theory.  I haven’t tried this so YMMV)

 HTH

 -Mike

 On Jan 26, 2015, at 4:25 PM, Antony Mayi antonym...@yahoo.com.INVALID
 wrote:

 should have said I am running as yarn-client. all I can see is
 specifying the generic executor memory that is then to be used in all
 containers.


   On Monday, 26 January 2015, 16:48, Charles Feduke 
 charles.fed...@gmail.com wrote:



 You should look at using Mesos. This should abstract away the individual
 hosts into a pool of resources and make the different physical
 specifications manageable.

 I haven't tried configuring Spark Standalone mode to have different
 specs on different machines but based on spark-env.sh.template:

 # - SPARK_WORKER_CORES, to set the number of cores to use on this machine
 # - SPARK_WORKER_MEMORY, to set how much total memory workers have to
 give executors (e.g. 1000m, 2g)
 # - SPARK_WORKER_OPTS, to set config properties only for the worker
 (e.g. -Dx=y)
 it looks like you should be able to mix. (Its not clear to me whether

Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Sandy Ryza
Hi Andrew,

Here's a note from the doc for sequenceFile:

* '''Note:''' Because Hadoop's RecordReader class re-uses the same
Writable object for each
* record, directly caching the returned RDD will create many references
to the same object.
* If you plan to directly cache Hadoop writable objects, you should
first copy them using
* a `map` function.

This should probably say direct cachingly *or directly shuffling*.  To
sort directly from a sequence file, the records need to be cloned first.

-Sandy


On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson 
andrew.row...@thomsonreuters.com wrote:

 I've found a strange issue when trying to sort a lot of data in HDFS using
 spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
 that derives from BytesWritable (the value is also a BytesWritable). I'm
 using a custom KryoSerializer to serialize the underlying byte array
 (basically write the length and the byte array).

 My spark job looks like this:

 spark.sequenceFile(inputPath, classOf[CustomKey],
 classOf[BytesWritable]).sortByKey().map(t =
 t._1).saveAsTextFile(outputPath)

 CustomKey extends BytesWritable, adds a toString method and some other
 helper methods that extract and convert parts of the underlying byte[].

 This should simply output a series of textfiles which contain the sorted
 list of keys. The problem is that under certain circumstances I get many
 duplicate keys. The number of records output is correct, but it appears
 that
 large chunks of the output are simply copies of the last record in that
 chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].

 This appears to happen only above certain input data volumes, and it
 appears
 to be when shuffle spills. For a job where shuffle spill for memory and
 disk
 = 0B, the data is correct. If there is any spill, I see the duplicate
 behaviour. Oddly, the shuffle write is much smaller when there's a spill.
 E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
 whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
 write.
 I'm guessing some sort of compression is happening on duplicate identical
 values?

 Oddly, I can fix this issue if I adjust my scala code to insert a map step
 before the call to sortByKey():

 .map(t = (new CustomKey(t._1),t._2))

 This constructor is just:

 public CustomKey(CustomKey left) { this.set(left); }

 Why does this work? I've no idea.

 The spark job is running in yarn-client mode with all the default
 configuration values set. Using the external shuffle service and disabling
 spill compression makes no difference.

 Is this a bug?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Sandy Ryza
Filed https://issues.apache.org/jira/browse/SPARK-5500 for this.

-Sandy

On Fri, Jan 30, 2015 at 11:59 AM, Aaron Davidson ilike...@gmail.com wrote:

 Ah, this is in particular an issue due to sort-based shuffle (it was not
 the case for hash-based shuffle, which would immediately serialize each
 record rather than holding many in memory at once). The documentation
 should be updated.

 On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Andrew,

 Here's a note from the doc for sequenceFile:

 * '''Note:''' Because Hadoop's RecordReader class re-uses the same
 Writable object for each
 * record, directly caching the returned RDD will create many
 references to the same object.
 * If you plan to directly cache Hadoop writable objects, you should
 first copy them using
 * a `map` function.

 This should probably say direct cachingly *or directly shuffling*.  To
 sort directly from a sequence file, the records need to be cloned first.

 -Sandy


 On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson 
 andrew.row...@thomsonreuters.com wrote:

 I've found a strange issue when trying to sort a lot of data in HDFS
 using
 spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a
 class
 that derives from BytesWritable (the value is also a BytesWritable). I'm
 using a custom KryoSerializer to serialize the underlying byte array
 (basically write the length and the byte array).

 My spark job looks like this:

 spark.sequenceFile(inputPath, classOf[CustomKey],
 classOf[BytesWritable]).sortByKey().map(t =
 t._1).saveAsTextFile(outputPath)

 CustomKey extends BytesWritable, adds a toString method and some other
 helper methods that extract and convert parts of the underlying byte[].

 This should simply output a series of textfiles which contain the sorted
 list of keys. The problem is that under certain circumstances I get many
 duplicate keys. The number of records output is correct, but it appears
 that
 large chunks of the output are simply copies of the last record in that
 chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].

 This appears to happen only above certain input data volumes, and it
 appears
 to be when shuffle spills. For a job where shuffle spill for memory and
 disk
 = 0B, the data is correct. If there is any spill, I see the duplicate
 behaviour. Oddly, the shuffle write is much smaller when there's a spill.
 E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
 whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
 write.
 I'm guessing some sort of compression is happening on duplicate identical
 values?

 Oddly, I can fix this issue if I adjust my scala code to insert a map
 step
 before the call to sortByKey():

 .map(t = (new CustomKey(t._1),t._2))

 This constructor is just:

 public CustomKey(CustomKey left) { this.set(left); }

 Why does this work? I've no idea.

 The spark job is running in yarn-client mode with all the default
 configuration values set. Using the external shuffle service and
 disabling
 spill compression makes no difference.

 Is this a bug?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: HW imbalance

2015-01-29 Thread Sandy Ryza
My answer was based off the specs that Antony mentioned: different amounts
of memory, but 10 cores on all the boxes.  In that case, a single Spark
application's homogeneously sized executors won't be able to take advantage
of the extra memory on the bigger boxes.

Cloudera Manager can certainly configure YARN with different resource
profiles for different nodes if that's what you're wondering.

-Sandy

On Thu, Jan 29, 2015 at 11:03 PM, Michael Segel msegel_had...@hotmail.com
wrote:

 @Sandy,

 There are two issues.
 The spark context (executor) and then the cluster under YARN.

 If you have a box where each yarn job needs 3GB,  and your machine has
 36GB dedicated as a YARN resource, you can run 12 executors on the single
 node.
 If you have a box that has 72GB dedicated to YARN, you can run up to 24
 contexts (executors) in parallel.

 Assuming that you’re not running any other jobs.

 The larger issue is if your version of Hadoop will easily let you run with
 multiple profiles or not. Ambari (1.6 and early does not.) Its supposed to
 be fixed in 1.7 but I haven’t evaluated it yet.
 Cloudera? YMMV

 If I understood the question raised by the OP, its more about a
 heterogeneous cluster than spark.

 -Mike

 On Jan 26, 2015, at 5:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Antony,

 Unfortunately, all executors for any single Spark application must have
 the same amount of memory.  It's possibly to configure YARN with different
 amounts of memory for each host (using
 yarn.nodemanager.resource.memory-mb), so other apps might be able to take
 advantage of the extra memory.

 -Sandy

 On Mon, Jan 26, 2015 at 8:34 AM, Michael Segel msegel_had...@hotmail.com
 wrote:

 If you’re running YARN, then you should be able to mix and max where YARN
 is managing the resources available on the node.

 Having said that… it depends on which version of Hadoop/YARN.

 If you’re running Hortonworks and Ambari, then setting up multiple
 profiles may not be straight forward. (I haven’t seen the latest version of
 Ambari)

 So in theory, one profile would be for your smaller 36GB of ram, then one
 profile for your 128GB sized machines.
 Then as your request resources for your spark job, it should schedule the
 jobs based on the cluster’s available resources.
 (At least in theory.  I haven’t tried this so YMMV)

 HTH

 -Mike

 On Jan 26, 2015, at 4:25 PM, Antony Mayi antonym...@yahoo.com.INVALID
 wrote:

 should have said I am running as yarn-client. all I can see is specifying
 the generic executor memory that is then to be used in all containers.


   On Monday, 26 January 2015, 16:48, Charles Feduke 
 charles.fed...@gmail.com wrote:



 You should look at using Mesos. This should abstract away the individual
 hosts into a pool of resources and make the different physical
 specifications manageable.

 I haven't tried configuring Spark Standalone mode to have different specs
 on different machines but based on spark-env.sh.template:

 # - SPARK_WORKER_CORES, to set the number of cores to use on this machine
 # - SPARK_WORKER_MEMORY, to set how much total memory workers have to
 give executors (e.g. 1000m, 2g)
 # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g.
 -Dx=y)
 it looks like you should be able to mix. (Its not clear to me whether
 SPARK_WORKER_MEMORY is uniform across the cluster or for the machine where
 the config file resides.)

 On Mon Jan 26 2015 at 8:07:51 AM Antony Mayi 
 antonym...@yahoo.com.invalid wrote:

 Hi,

 is it possible to mix hosts with (significantly) different specs within a
 cluster (without wasting the extra resources)? for example having 10 nodes
 with 36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there
 a way to utilize the extra memory by spark executors (as my understanding
 is all spark executors must have same memory).

 thanks,
 Antony.









Re: RDD caching, memory network input

2015-01-28 Thread Sandy Ryza
Hi Fanilo,

How many cores are you using per executor?  Are you aware that you can
combat the container is running beyond physical memory limits error by
bumping the spark.yarn.executor.memoryOverhead property?

Also, are you caching the parsed version or the text?

-Sandy

On Wed, Jan 28, 2015 at 4:25 AM, Andrianasolo Fanilo 
fanilo.andrianas...@worldline.com wrote:

  Hello Spark fellows J,



 I think I need some help to understand how .cache and task input works
 within a job.



 I have an 7 GB input matrix in HDFS that I load using .textFile(). I also
 have a config file which contains an array of 12 Logistic Regression Model
 parameters, loaded as an Array[String], let’s call it models.



 Then I basically apply each model to each line (as a LabeledPoint) of my
 matrix as following :



 val matrix = sc.textFile*(// HDFS path to matrix)…(parse matrix to make
 RDD[(String, LabeledPoint)]*



 models.map( model =

 val weights = *// parse model, which is an Array[String],
 to a Vector to give to LogisticRegressionModel*

 val rl = new LogisticRegressionModel(weigths, intercept)

 rl.setThresold(0.5)



 matrix.flatMap(

point = rl.predict(point._2.features)
 match {

case 1.0 = Seq(“cool”)

case 0.0 = Seq()

}

 )

 ).reduce(_++_)



 It seems normal to cache the matrix, since otherwise I’m going to read it
 12 times, each per model.



 So…I launch my job on a 3 machines YARN cluster, using 18 executors
 with 4GB memory each and 1 executor core.



 When I don’t cache the matrix, the job executes in 12 minutes, and going
 to Spark UI I can see that each task has a 128 MB Hadoop input which is
 normal.



 When I cache the matrix before going through the models.map part, the
 first tasks process data from Hadoop input, and the matrix is completely
 stored in-memory (verified in the Storage tab of Spark UI). Unfortunately,
 the job takes 48 minutes instead of 12 minutes, because very few tasks
 actually read directly from memory afterwards, most tasks have network
 input and NODE_LOCAL locality level and those tasks take triple the time
 than tasks with Hadoop input or memory input.



 Can you confirm my initial thoughts that :

 · There are 18 executors on 3 machines, so 6 executors per machine

 · One partition from matrix rdd is stored into one executor

 · When a task needs to compute a partition in memory, it tries to
 get itself allocated on the executor that stores the partition

 · If the executor is already dealing with a task, it is going to
 another executor on the same machine and then “downloads” the partition,
 hence the network input

 ?



 If that is the case, how would you deal with the problem  :

 · Answer 1 : Higher number of cores per executor ? (that got me a 
 *Container
 [pid=55355,containerID=container_1422284274724_0066_01_10] is running
 beyond physical memory limits *from YARN, sadly)

 · Answer 2 : Higher spark.locality.wait ? Since each task takes
 about 8 seconds and it’s at 3 by default

 · Answer 3 : Replicate the partitions ?

 · Answer 4 : Something only you guys know that I am not aware of ?

 · Bonus answer : don’t cache, it is not needed here



 Regards,



 Fanilo

 --

 Ce message et les pièces jointes sont confidentiels et réservés à l'usage
 exclusif de ses destinataires. Il peut également être protégé par le secret
 professionnel. Si vous recevez ce message par erreur, merci d'en avertir
 immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
 pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
 être recherchée quant au contenu de ce message. Bien que les meilleurs
 efforts soient faits pour maintenir cette transmission exempte de tout
 virus, l'expéditeur ne donne aucune garantie à cet égard et sa
 responsabilité ne saurait être recherchée pour tout dommage résultant d'un
 virus transmis.

 This e-mail and the documents attached are confidential and intended
 solely for the addressee; it may also be privileged. If you receive this
 e-mail in error, please notify the sender immediately and destroy it. As
 its integrity cannot be secured on the Internet, the Worldline liability
 cannot be triggered for the message content. Although the sender endeavours
 to maintain a computer virus-free network, the sender does not warrant that
 this transmission is virus-free and will not be liable for any damages
 resulting from any virus transmitted.



Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-01-27 Thread Sandy Ryza
Hi Antony,

If you look in the YARN NodeManager logs, do you see that it's killing the
executors?  Or are they crashing for a different reason?

-Sandy

On Tue, Jan 27, 2015 at 12:43 PM, Antony Mayi antonym...@yahoo.com.invalid
wrote:

 Hi,

 I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors
 crashed with this error.

 does that mean I have genuinely not enough RAM or is this matter of config
 tuning?

 other config options used:
 spark.storage.memoryFraction=0.3
 SPARK_EXECUTOR_MEMORY=14G

 running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is
 ALS trainImplicit on ~15GB dataset)

 thanks for any ideas,
 Antony.



Re: HW imbalance

2015-01-26 Thread Sandy Ryza
Hi Antony,

Unfortunately, all executors for any single Spark application must have the
same amount of memory.  It's possibly to configure YARN with different
amounts of memory for each host (using
yarn.nodemanager.resource.memory-mb), so other apps might be able to take
advantage of the extra memory.

-Sandy

On Mon, Jan 26, 2015 at 8:34 AM, Michael Segel msegel_had...@hotmail.com
wrote:

 If you’re running YARN, then you should be able to mix and max where YARN
 is managing the resources available on the node.

 Having said that… it depends on which version of Hadoop/YARN.

 If you’re running Hortonworks and Ambari, then setting up multiple
 profiles may not be straight forward. (I haven’t seen the latest version of
 Ambari)

 So in theory, one profile would be for your smaller 36GB of ram, then one
 profile for your 128GB sized machines.
 Then as your request resources for your spark job, it should schedule the
 jobs based on the cluster’s available resources.
 (At least in theory.  I haven’t tried this so YMMV)

 HTH

 -Mike

 On Jan 26, 2015, at 4:25 PM, Antony Mayi antonym...@yahoo.com.INVALID
 wrote:

 should have said I am running as yarn-client. all I can see is specifying
 the generic executor memory that is then to be used in all containers.


   On Monday, 26 January 2015, 16:48, Charles Feduke 
 charles.fed...@gmail.com wrote:



 You should look at using Mesos. This should abstract away the individual
 hosts into a pool of resources and make the different physical
 specifications manageable.

 I haven't tried configuring Spark Standalone mode to have different specs
 on different machines but based on spark-env.sh.template:

 # - SPARK_WORKER_CORES, to set the number of cores to use on this machine
 # - SPARK_WORKER_MEMORY, to set how much total memory workers have to give
 executors (e.g. 1000m, 2g)
 # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g.
 -Dx=y)
 it looks like you should be able to mix. (Its not clear to me whether
 SPARK_WORKER_MEMORY is uniform across the cluster or for the machine where
 the config file resides.)

 On Mon Jan 26 2015 at 8:07:51 AM Antony Mayi antonym...@yahoo.com.invalid
 wrote:

 Hi,

 is it possible to mix hosts with (significantly) different specs within a
 cluster (without wasting the extra resources)? for example having 10 nodes
 with 36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there
 a way to utilize the extra memory by spark executors (as my understanding
 is all spark executors must have same memory).

 thanks,
 Antony.







Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sandy Ryza
Hi Sven,

What version of Spark are you running?  Recent versions have a change that
allows PySpark to share a pool of processes instead of starting a new one
for each task.

-Sandy

On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote:

 Hey all,

 I am running into a problem where YARN kills containers for being over
 their memory allocation (which is about 8G for executors plus 6G for
 overhead), and I noticed that in those containers there are tons of
 pyspark.daemon processes hogging memory. Here's a snippet from a container
 with 97 pyspark.daemon processes. The total sum of RSS usage across all of
 these is 1,764,956 pages (i.e. 6.7GB on the system).

 Any ideas what's happening here and how I can get the number of
 pyspark.daemon processes back to a more reasonable count?

 2015-01-23 15:36:53,654 INFO  [Reporter] yarn.YarnAllocationHandler 
 (Logging.scala:logInfo(59)) - Container marked as failed: 
 container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: 
 Container [pid=35211,containerID=container_1421692415636_0052_01_30] is 
 running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB 
 physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing 
 container.
 Dump of the process-tree for container_1421692415636_0052_01_30 :
   |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
   |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m 
 pyspark.daemon
   |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m 
 pyspark.daemon
   |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m 
 pyspark.daemon

   [...]


 Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c

 Thank you!
 -Sven

 --
 http://sites.google.com/site/krasser/?utm_source=sig



Re: RangePartitioner

2015-01-21 Thread Sandy Ryza
Hi Rishi,

If you look in the Spark UI, have any executors registered?

Are you able to collect a jstack of the driver process?

-Sandy

On Tue, Jan 20, 2015 at 9:07 PM, Rishi Yadav ri...@infoobjects.com wrote:

  I am joining two tables as below, the program stalls at below log line
 and never proceeds.
 What might be the issue and possible solution?

  INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79

 Table 1 has  450 columns
 Table2 has  100 columns

 Both tables have few million rows


 val table1= myTable1.as('table1)
 val table2= myTable2.as('table2)
 val results=
 table1.join(table2,LeftOuter,Some(table1.Id.attr === table2.id.attr ))


println(results.count())

 Thanks and Regards,
 Rishi
 @meditativesoul



Re: Trouble with large Yarn job

2015-01-11 Thread Sandy Ryza
Hi Anders,

Have you checked your NodeManager logs to make sure YARN isn't killing
executors for exceeding memory limits?

-Sandy

On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote:

 Hey,

 I have a job that keeps failing if too much data is processed, and I can't
 see how to get it working. I've tried repartitioning with more partitions
 and increasing amount of memory for the executors (now about 12G and 400
 executors. Here is a snippets of the first part of the code, which succeeds
 without any problems:

 val all_days = sc.union(
   ds.dateInterval(startDate, date).map(date =
 sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
   .map(s = (
 (s.getUsername, s.getTrackUri),
 UserItemData(s.getUsername, s.getTrackUri,
   build_vector1(date, s),
   build_vector2(s
   )
 )
   .reduceByKey(sum_vectors)

 I want to process 30 days of data or more, but am only able to process
 about 10 days. If having more days of data (lower startDate in code
 above), the union above succeeds but the code below fails with Error
 communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL
 for more detailed error messages). Here is a snippet of the code that fails:

 val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey
 (_+_)
   .filter(trackFilter)
   .repartition(4)
   .persist(StorageLevel.MEMORY_AND_DISK_SER)

 val observation_data = all_days
   .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
   .join(top_tracks)

 The calculation of top_tracks works, but the last mapPartitions task
 fails with given error message if given more than 10 days of data. Also
 tried increasing the spark.akka.askTimeout setting, but it still fails
 even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
 and the kryo serialization.

 Realize that this is a rather long message, but I'm stuck and would
 appreciate any help or clues for resolving this issue. Seems to be a
 out-of-memory issue, but it does not seems to help to increase the number
 of partitions.

 Thanks,
 Anders



Re: SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException

2015-01-08 Thread Sandy Ryza
Hi Mukesh,

Those line numbers in ConverterUtils in the stack trace don't appear to
line up with CDH 5.3:
https://github.com/cloudera/hadoop-common/blob/cdh5-2.5.0_5.3.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java

Is it possible you're still including the old jars on the classpath in some
way?

-Sandy

On Thu, Jan 8, 2015 at 3:38 AM, Mukesh Jha me.mukesh@gmail.com wrote:

 Hi Experts,

 I am running spark inside YARN job.

 The spark-streaming job is running fine in CDH-5.0.0 but after the upgrade
 to 5.3.0 it cannot fetch containers with the below errors. Looks like the
 container id is incorrect and a string is present in a pace where it's
 expecting a number.



 java.lang.IllegalArgumentException: Invalid ContainerId:
 container_e01_1420481081140_0006_01_01

 Caused by: java.lang.NumberFormatException: For input string: e01



 Is this a bug?? Did you face something similar and any ideas how to fix
 this?



 15/01/08 09:50:28 INFO yarn.ApplicationMaster: Registered signal handlers
 for [TERM, HUP, INT]

 15/01/08 09:50:29 ERROR yarn.ApplicationMaster: Uncaught exception:

 java.lang.IllegalArgumentException: Invalid ContainerId:
 container_e01_1420481081140_0006_01_01

 at
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)

 at
 org.apache.spark.deploy.yarn.YarnRMClientImpl.getAttemptId(YarnRMClientImpl.scala:79)

 at
 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:79)

 at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:515)

 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)

 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:415)

 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)

 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)

 at
 org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:513)

 at
 org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

 Caused by: java.lang.NumberFormatException: For input string: e01

 at
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

 at java.lang.Long.parseLong(Long.java:441)

 at java.lang.Long.parseLong(Long.java:483)

 at
 org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)

 at
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)

 ... 11 more

 15/01/08 09:50:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
 exitCode: 10, (reason: Uncaught exception: Invalid ContainerId:
 container_e01_1420481081140_0006_01_01)

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*



Re: Can spark supports task level resource management?

2015-01-07 Thread Sandy Ryza
Hi Xuelin,

Spark 1.2 includes a dynamic allocation feature that allows Spark on YARN
to modulate its YARN resource consumption as the demands of the application
grow and shrink.  This is somewhat coarser than what you call task-level
resource management.  Elasticity comes through allocating and releasing
executors, not through requesting resources from YARN for individual
tasks.  It would be good to add finer-grained task-level elasticity as
well, but this will rely on some YARN work (YARN-1197) for changing the
resource allocation of a running container.

Mesos has a fine-grained mode similar to what you're wondering about.
It's documented here:
https://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes.

-Sandy

On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote:


 Hi,

  Currently, we are building up a middle scale spark cluster (100
 nodes) in our company. One thing bothering us is, the how spark manages the
 resource (CPU, memory).

  I know there are 3 resource management modes: stand-along, Mesos, Yarn

  In the stand along mode, the cluster master simply allocates the
 resource when the application is launched. In this mode, suppose an
 engineer launches a spark-shell, claiming 100 CPU cores and 100G memory,
 but doing nothing. But the cluster master simply allocates the resource to
 this app even if the spark-shell does nothing. This is definitely not what
 we want.

  What we want is, the resource is allocated when the actual task is
 about to run. For example, in the map stage, the app may need 100 cores
 because the RDD has 100 partitions, while in the reduce stage, only 20
 cores is needed because the RDD is shuffled into 20 partitions.

  I'm not very clear about the granularity of the spark resource
 management. In the stand-along mode, the resource is allocated when the app
 is launched. What about Mesos and Yarn? Can they support task level
 resource management?

  And, what is the recommended mode for resource management? (Mesos?
 Yarn?)

  Thanks





  1   2   >