Re: Questions about the files that Spark will produce during its running

2013-10-29 Thread Matei Zaharia
The error is from a worker node -- did you check that /data2 is set up properly 
on the worker nodes too? In general that should be the only directory used.

Matei

On Oct 28, 2013, at 6:52 PM, Shangyu Luo lsy...@gmail.com wrote:

 Hello,
 I have some questions about the files that Spark will create and use during 
 its running.
 (1) I am running a python program on Spark with a cluster of EC2. The data 
 comes from hdfs file system.  I have met the following error in the console 
 of the master node:
 java.io.FileNotFoundException: 
 /data2/tmp/spark-local-20131029003412-c340/1b/shuffle_1_527_79 (No space left 
 on device)
 at java.io.FileOutputStream.openAppend(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:207)
 at 
 org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
 at 
 org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
 at 
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
 at 
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
 at scala.collection.Iterator$class.foreach(Iterator.scala:772)
 at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:679)
 I set spark.local.dir=/data2/tmp in spark-env.sh and there is about 800G 
 space in data2 directory. I have checked the space of data2 and it is just 
 used about 3G.
 So why Spark thinks that there is no space left on device?
 
 (2) Moreover, I am wondering if Spark will create some files  under other 
 directories other than spark.local.dir? Presently I use 
 a=b.map.persist(storage.disk_only) in some part of my program, where will the 
 persisted data be stored?
 
 (3) Lastly, I also had Removing BlockManager xxx with no recent heart beats: 
 xms exceeds 45000ms  error sometimes. I have set the corresponding 
 parameters in spark-env.sh:
 SPARK_JAVA_OPTS+=-Dspark.akka.timeout=30 
 SPARK_JAVA_OPTS+=-Dspark.worker.timeout=30 
 SPARK_JAVA_OPTS+=-Dspark.akka.askTimeout=3000 
 SPARK_JAVA_OPTS+=-Dspark.storage.blockManagerHeartBeatMs=30 
 SPARK_JAVA_OPTS+=-Dspark.akka.retry.wait=30 
 But it does no help. Can someone gives me some suggestion about solving this 
 problem?
 
 Any help will be appreciated!
 Thanks!
 
 Best,
 Shangyu
 



Re: Questions about the files that Spark will produce during its running

2013-10-29 Thread Shangyu Luo
Yes,
I broadcast the spark-env.sh file to all worker nodes before I run my
program and then execute bin/stop-all.sh, bin/start-all.sh.
I have also viewed the size of data2 directory on each worker node and it
is also about 800G.
Thanks!


2013/10/29 Matei Zaharia matei.zaha...@gmail.com

 The error is from a worker node -- did you check that /data2 is set up
 properly on the worker nodes too? In general that should be the only
 directory used.

 Matei

 On Oct 28, 2013, at 6:52 PM, Shangyu Luo lsy...@gmail.com wrote:

 Hello,
 I have some questions about the files that Spark will create and use
 during its running.
 (1) I am running a python program on Spark with a cluster of EC2. The data
 comes from hdfs file system.  I have met the following error in the console
 of the master node:
 *java.io.FileNotFoundException:
 /data2/tmp/spark-local-20131029003412-c340/1b/shuffle_1_527_79 (No space
 left on device)*
 at java.io.FileOutputStream.openAppend(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:207)
 at
 org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
 at
 org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
 at scala.collection.Iterator$class.foreach(Iterator.scala:772)
 at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
 at
 org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
 at
 org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:679)
 I set spark.local.dir=*/data2/tmp* in spark-env.sh and there is about *
 800G* space in data2 directory. I have checked the space of data2 and it
 is just used about 3G.
 So why Spark thinks that there is no space left on device?

 (2) Moreover, *I am wondering if Spark will create some files  under
 other directories other than spark.local.dir*? Presently I use a=b.map.*
 persist(storage.disk_only)* in some part of my program, where will the
 persisted data be stored?

 (3) Lastly, I also had *Removing BlockManager xxx with no recent heart
 beats: xms exceeds 45000ms*  error sometimes. I have set the
 corresponding parameters in spark-env.sh:
 SPARK_JAVA_OPTS+=-Dspark.akka.timeout=30 
 SPARK_JAVA_OPTS+=-Dspark.worker.timeout=30 
 SPARK_JAVA_OPTS+=-Dspark.akka.askTimeout=3000 
 SPARK_JAVA_OPTS+=-Dspark.storage.blockManagerHeartBeatMs=30 
 SPARK_JAVA_OPTS+=-Dspark.akka.retry.wait=30 
 But it does no help. Can someone gives me some suggestion about solving
 this problem?

 Any help will be appreciated!
 Thanks!

 Best,
 Shangyu





-- 
--

Shangyu, Luo
Department of Computer Science
Rice University

--
Not Just Think About It, But Do It!
--
Success is never final.
--
Losers always whine about their best


Re: Task output before a shuffle

2013-10-29 Thread Ufuk Celebi
On 29 Oct 2013, at 02:47, Matei Zaharia matei.zaha...@gmail.com wrote:
 Yes, we still write out data after these tasks in Spark 0.8, and it needs to 
 be written out before any stage that reads it can start. The main reason is 
 simplicity when there are faults, as well as more flexible scheduling (you 
 don't have to decide where each reduce task is in advance, you can have more 
 reduce tasks than you have CPU cores, etc).

Thank you for the answer! I have a follow-up:

In which fraction (RDD or non-RDD) of the heap will the output be stored before 
spilling to disk?

I have a job where I read over all large data set once and don't persist 
anything. Would it make sense to set spark.storage.memoryFraction to a 
smaller value in order to avoid spilling to disk?

- Ufuk

Re: compare/contrast Spark with Cascading

2013-10-29 Thread Koert Kuipers
Hey Prashant,
I assume you mean steps to reproduce the OOM. I do not currently. I just
ran into them when porting some jobs from map-red. I never turned it into a
reproducible test, and i do not exclude that it was my poor programming
that caused it. However it happened with a bunch of jobs, and then i asked
on the message boards about the OOM, and people pointed me to the
assumption about reducer input having to fit in memory. At that point i
felt like that was too much of a limitation for the jobs i was trying to
port and i gave up.


On Tue, Oct 29, 2013 at 1:12 AM, Prashant Sharma scrapco...@gmail.comwrote:

 Hey Koert,

 Can you give me steps to reproduce this ?


 On Tue, Oct 29, 2013 at 10:06 AM, Koert Kuipers ko...@tresata.com wrote:

 Matei,
 We have some jobs where even the input for a single key in a groupBy
 would not fit in the the tasks memory. We rely on mapred to stream from
 disk to disk as it reduces.
 I think spark should be able to handle that situation to truly be able to
 claim it can replace map-red (or not?).
 Best, Koert


 On Mon, Oct 28, 2013 at 8:51 PM, Matei Zaharia 
 matei.zaha...@gmail.comwrote:

 FWIW, the only thing that Spark expects to fit in memory if you use
 DISK_ONLY caching is the input to each reduce task. Those currently don't
 spill to disk. The solution if datasets are large is to add more reduce
 tasks, whereas Hadoop would run along with a small number of tasks that do
 lots of disk IO. But this is something we will likely change soon. Other
 than that, everything runs in a streaming fashion and there's no need for
 the data to fit in memory. Our goal is certainly to work on any size
 datasets, and some of our current users are explicitly using Spark to
 replace things like Hadoop Streaming in just batch jobs (see e.g. Yahoo!'s
 presentation from http://ampcamp.berkeley.edu/3/). If you run into
 trouble with these, let us know, since it is an explicit goal of the
 project to support it.

 Matei

 On Oct 28, 2013, at 5:32 PM, Koert Kuipers ko...@tresata.com wrote:

 no problem :) i am actually not familiar with what oscar has said on
 this. can you share or point me to the conversation thread?

 it is my opinion based on the little experimenting i have done. but i am
 willing to be convinced otherwise.
 one the very first things i did when we started using spark is run jobs
 with DISK_ONLY, and see if it could some of the jobs that map-reduce does
 for us. however i ran into OOMs, presumably because spark makes assumptions
 that some things should fit in memory. i have to admit i didn't try too
 hard after the first OOMs.

 if spark were able to scale from the quick in-memory query to the
 overnight disk-only giant batch query, i would love it! spark has a much
 nicer api than map-red, and one could use a single set of algos for
 everything from quick/realtime queries to giant batch jobs. as far as i am
 concerned map-red would be done. our clusters of the future would be hdfs +
 spark.


 On Mon, Oct 28, 2013 at 8:16 PM, Mark Hamstra 
 m...@clearstorydata.comwrote:

 And I didn't mean to skip over you, Koert.  I'm just more familiar with
 what Oscar said on the subject than with your opinion.



 On Mon, Oct 28, 2013 at 5:13 PM, Mark Hamstra 
 m...@clearstorydata.comwrote:

 Hmmm... I was unaware of this concept that Spark is for medium to
 large datasets but not for very large datasets.


 It is in the opinion of some at Twitter.  That doesn't make it true or
 a universally held opinion.



 On Mon, Oct 28, 2013 at 5:08 PM, Ashish Rangole arang...@gmail.comwrote:

 Hmmm... I was unaware of this concept that Spark is for medium to
 large datasets but not for very large datasets. What size is very large?

 Can someone please elaborate on why this would be the case and what
 stops Spark, as it is today, to be successfully run on very large 
 datasets?
 I'll appreciate it.

 I would think that Spark should be able to pull off Hadoop level
 throughput in worst case with DISK_ONLY caching.

 Thanks
 On Oct 28, 2013 1:37 PM, Koert Kuipers ko...@tresata.com wrote:

 i would say scaling (cascading + DSL for scala) offers similar
 functionality to spark, and a similar syntax.
 the main difference between spark and scalding is target jobs:
 scalding is for long running jobs on very large data. the data is
 read from and written to disk between steps. jobs run from minutes to 
 days.
 spark is for faster jobs on medium to large data. the data is
 primarily held in memory. jobs run from a few seconds to a few hours.
 although spark can work with data on disks it still makes assumptions 
 that
 data needs to fit in memory for certain steps (although less and less 
 with
 every release). spark also makes iterative designs much easier.

 i have found them both great to program in and complimentary. we use
 scalding for overnight batch processes and spark for more realtime
 processes. at this point i would trust scalding a lot more due to the
 robustness of the stack, but 

Re: met a problem while running a streaming example program

2013-10-29 Thread Patrick Wendell
If you just add the extends Serializable changes from here it should work.

On Tue, Oct 29, 2013 at 9:36 AM, Patrick Wendell pwend...@gmail.com wrote:
 This was fixed on 0.8 branch and master:
 https://github.com/apache/incubator-spark/pull/63/files

 - Patrick

 On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges
 thunder.stump...@gmail.com wrote:
 I vaguely remember running into this same error. It says there
 java.io.NotSerializableException:
 org.apache.spark.streaming.examples.clickstream.PageView... can you
 check the PageView class in the examples and make sure it has the
 @serializable directive? I seem to remember having to add it.

 good luck,
 Thunder


 On Tue, Oct 29, 2013 at 6:54 AM, dachuan hdc1...@gmail.com wrote:
 Hi,

 I have tried the clickstream example, it runs into an exception, anybody met
 this before?

 Since the program mentioned local[2], so I run it in my local machine.

 thanks in advance,
 dachuan.

 Log Snippet 1:

 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks
 from Stage 12 (MapPartitionsRDD[63] at combineByKey at
 ShuffledDStream.scala:41)
 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230
 bytes
 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75
 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0
 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition
 org.apache.spark.rdd.BlockRDDPartition@0
 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed
 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to
 java.io.NotSerializableException
 java.io.NotSerializableException:
 org.apache.spark.streaming.examples.clickstream.PageView

 Log Snippet 2:
 org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4
 times; aborting job java.io.NotSerializableException:
 org.apache.spark.streaming.examples.clickstream.PageView
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
 at
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
 at
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

 Two commands that run this app:
 ./run-example
 org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10
 ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream
 errorRatePerZipCode localhost 4



Re: spark-0.8.0 and hadoop-2.1.0-beta

2013-10-29 Thread Matei Zaharia
I’m curious, Viren, do you have a patch you could post to build this against 
YARN 2.1 / 2.2? It would be nice to see how big the changes are.

Matei

On Sep 30, 2013, at 10:14 AM, viren kumar vire...@gmail.com wrote:

 I was able to get Spark 0.8.0 to compile with Hadoop/Yarn 2.1.0-beta, by 
 following some of the changes described here: 
 http://hortonworks.com/blog/stabilizing-yarn-apis-for-apache-hadoop-2-beta-and-beyond/
 
 That should help you build most of it. One change not covered there is the 
 change from ProtoUtils.convertFromProtoFormat(containerToken, cmAddress) to 
 ConverterUtils.convertFromYarn(containerToken, cmAddress).
 
 Not 100% sure that my changes are correct. 
 
 Hope that helps,
 Viren
 
 
 On Sun, Sep 29, 2013 at 8:59 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hi Terence,
 
 YARN's API changed in an incompatible way in Hadoop 2.1.0, so I'd suggest 
 sticking with 2.0.x for now. We may create a different branch for this 
 version. Unfortunately due to the API change it may not be possible to 
 support this version while also supporting other widely-used versions like 
 0.23.x.
 
 Matei
 
 On Sep 29, 2013, at 11:00 AM, Terance Dias terance.d...@gmail.com wrote:
 
 
  Hi, I'm trying to build spark-0.8.0 with hadoop-2.1.0-beta.
  I have changed the following properties in SparkBuild.scala file.
 
  val DEFAULT_HADOOP_VERSION = 2.1.0-beta
  val DEFAULT_YARN = true
 
  when i do sbt clean compile, I get an error saying
 
  [error] 
  /usr/local/spark-0.8.0-incubating/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:42:
   not found: type AMRMProtocol
  [error]   private var resourceManager: AMRMProtocol = null
 
  Thanks,
  Terance.
 
 
 



executor failures w/ scala 2.10

2013-10-29 Thread Imran Rashid
We've been testing out the 2.10 branch of spark, and we're running into
some issues were akka disconnects from the executors after a while.  We ran
some simple tests first, and all was well, so we started upgrading our
whole codebase to 2.10.  Everything seemed to be working, but then we
noticed that when we run long jobs, and then things start failing.


The first suspicious thing is that we get akka warnings about undeliverable
messages sent to deadLetters:

22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
was not delivered. [4] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
was not delivered. [5] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.



Generally within a few seconds after the first such message, there are a
bunch more, and then the executor is marked as failed, and a new one is
started:

2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10] dead
letters encountered, no more dead letters will be logged. This logging can
be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17] INFO
org.apache.spark.deploy.client.Client$ClientActor - Executor updated:
app-2013102911-/1 is now FAILED (Command exited with code 1)

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17] INFO
org.apache.spark.deploy.client.Client$ClientActor - Executor added:
app-2013102911-/2 on
worker-20131029105824-dhd2.quantifind.com-51544 (dhd2.quantifind.com:51544)
with 24 cores

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18] ERROR
akka.remote.EndpointWriter - AssociationError [akka.tcp://
sp...@ddd0.quantifind.com:43068] - [akka.tcp://
sparkexecu...@dhd2.quantifind.com:45794]: Error [Association failed with
[akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: dhd2.quantifind.com/10.10.5.64:45794]



Looking in the logs of the failed executor, there are some similar messages
about undeliverable messages, but I don't see any reason:

13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.actor.FSM$Timer]
from Actor[akka://sparkExecutor/deadLetters] to
Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1] dead
letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://sparkExecutor/deadLetters] to
Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2] dead
letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://sparkExecutor/deadLetters] to
Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [3] dead
letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend: 

Re: met a problem while running a streaming example program

2013-10-29 Thread dachuan
yes, it works after checkout branch-0.8.

thanks.


On Tue, Oct 29, 2013 at 12:51 PM, Patrick Wendell pwend...@gmail.comwrote:

 If you just add the extends Serializable changes from here it should
 work.

 On Tue, Oct 29, 2013 at 9:36 AM, Patrick Wendell pwend...@gmail.com
 wrote:
  This was fixed on 0.8 branch and master:
  https://github.com/apache/incubator-spark/pull/63/files
 
  - Patrick
 
  On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges
  thunder.stump...@gmail.com wrote:
  I vaguely remember running into this same error. It says there
  java.io.NotSerializableException:
  org.apache.spark.streaming.examples.clickstream.PageView... can you
  check the PageView class in the examples and make sure it has the
  @serializable directive? I seem to remember having to add it.
 
  good luck,
  Thunder
 
 
  On Tue, Oct 29, 2013 at 6:54 AM, dachuan hdc1...@gmail.com wrote:
  Hi,
 
  I have tried the clickstream example, it runs into an exception,
 anybody met
  this before?
 
  Since the program mentioned local[2], so I run it in my local
 machine.
 
  thanks in advance,
  dachuan.
 
  Log Snippet 1:
 
  13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing
 tasks
  from Stage 12 (MapPartitionsRDD[63] at combineByKey at
  ShuffledDStream.scala:41)
  13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is
 4230
  bytes
  13/10/29 08:50:25 INFO local.LocalScheduler: Running 75
  13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0
  13/10/29 08:50:25 INFO spark.CacheManager: Computing partition
  org.apache.spark.rdd.BlockRDDPartition@0
  13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0
 failed
  13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to
  java.io.NotSerializableException
  java.io.NotSerializableException:
  org.apache.spark.streaming.examples.clickstream.PageView
 
  Log Snippet 2:
  org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more
 than 4
  times; aborting job java.io.NotSerializableException:
  org.apache.spark.streaming.examples.clickstream.PageView
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
  at
 
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
 
  Two commands that run this app:
  ./run-example
  org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4
 10
  ./run-example
 org.apache.spark.streaming.examples.clickstream.PageViewStream
  errorRatePerZipCode localhost 4
 




-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210


Re: Getting exception org.apache.spark.SparkException: Job aborted: Task 1.0:37 failed more than 4 times

2013-10-29 Thread Sergey Soldatov
You may check 'out' files in logs directory for the failure details.


On Wed, Oct 30, 2013 at 12:17 AM, Soumya Simanta
soumya.sima...@gmail.comwrote:

 I'm using a pretty recent version of Spark ( 0.8) from Github and it's
 failing with the following exception for a very simple task on the
 spark-shell.


 *scala val file = sc.textFile(hdfs://...)*
 file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
 console:12

 *scala val errors = file.filter(line = line.contains(sometext))*
 errors: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at
 console:14

 *scala errors.count()*
 org.apache.spark.SparkException: Job aborted: Task 0.0:32 failed more than
 4 times
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:819)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:817)
  at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:817)
 at
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:432)
  at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:494)
  at
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)



Re: Spark cluster memory configuration for spark-shell

2013-10-29 Thread Aaron Davidson
You are correct. If you are just using spark-shell in local mode (i.e.,
without cluster), you can set the SPARK_MEM environment variable to give
the driver more memory. E.g.:
SPARK_MEM=24g ./spark-shell

Otherwise, if you're using a real cluster, the driver shouldn't require a
significant amount of memory, so SPARK_MEM should not have to be used.


On Tue, Oct 29, 2013 at 12:40 PM, Soumya Simanta
soumya.sima...@gmail.comwrote:

 I'm new to Spark. I want to try out a few simple example from the Spark
 shell. However, I'm not sure how to configure it so that I can make the
 max. use of memory on my workers.

 On average I've around 48 GB of RAM on each node on my cluster. I've
 around 10 nodes.

 Based on the documentation I could find memory based configuration in two
 places.

 *1. $SPARK_INSTALL_DIR/dist/conf/spark-env.sh *

 *SPARK_WORKER_MEMORY* Total amount of memory to allow Spark applications
 to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB);
 note that each application's *individual* memory is configured using its
 spark.executor.memory property.
 *2. spark.executor.memory JVM flag. *
  spark.executor.memory512m Amount of memory to use per executor process,
 in the same format as JVM memory strings (e.g. 512m, 2g).

 http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties

 In my case I want to use the max. memory possible on each node. My
 understanding is that I don't have to change *SPARK_WORKER_MEMORY *and I
 will have to increase spark.executor.memory to something big (e.g., 24g or
 32g). Is this correct? If yes, what is the correct way of setting this
 property if I just want to use the spark-shell.


 Thanks.
 -Soumya




RE: spark-0.8.0 and hadoop-2.1.0-beta

2013-10-29 Thread Liu, Raymond
I am also working on porting the trunk code onto 2.2.0. Seems quite many API 
changes but many of them are just a rename work.
While Yarn 2.1.0 beta also add some client API for easy interaction with YARN 
framework, but there are not many examples on how to use them ( API and wiki 
doc are both old and not reflecting the new API), some part of SPARK YARN code 
will need to be rewritten with the new client API
And I am not quite familiar with the user certification part of code, it might 
take times for it seems to me this part of codes also change a little bit, some 
methods gone, and I don't find the replacement or they are not need anymore.


Best Regards,
Raymond Liu

From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
Sent: Wednesday, October 30, 2013 2:35 AM
To: user@spark.incubator.apache.org
Subject: Re: spark-0.8.0 and hadoop-2.1.0-beta

I'm curious, Viren, do you have a patch you could post to build this against 
YARN 2.1 / 2.2? It would be nice to see how big the changes are.

Matei

On Sep 30, 2013, at 10:14 AM, viren kumar vire...@gmail.com wrote:


I was able to get Spark 0.8.0 to compile with Hadoop/Yarn 2.1.0-beta, by 
following some of the changes described here: 
http://hortonworks.com/blog/stabilizing-yarn-apis-for-apache-hadoop-2-beta-and-beyond/
That should help you build most of it. One change not covered there is the 
change from ProtoUtils.convertFromProtoFormat(containerToken, cmAddress) to 
ConverterUtils.convertFromYarn(containerToken, cmAddress).
Not 100% sure that my changes are correct. 
Hope that helps,
Viren

On Sun, Sep 29, 2013 at 8:59 AM, Matei Zaharia matei.zaha...@gmail.com wrote:
Hi Terence,

YARN's API changed in an incompatible way in Hadoop 2.1.0, so I'd suggest 
sticking with 2.0.x for now. We may create a different branch for this version. 
Unfortunately due to the API change it may not be possible to support this 
version while also supporting other widely-used versions like 0.23.x.

Matei

On Sep 29, 2013, at 11:00 AM, Terance Dias terance.d...@gmail.com wrote:


 Hi, I'm trying to build spark-0.8.0 with hadoop-2.1.0-beta.
 I have changed the following properties in SparkBuild.scala file.

 val DEFAULT_HADOOP_VERSION = 2.1.0-beta
 val DEFAULT_YARN = true

 when i do sbt clean compile, I get an error saying

 [error] 
 /usr/local/spark-0.8.0-incubating/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:42:
  not found: type AMRMProtocol
 [error]   private var resourceManager: AMRMProtocol = null

 Thanks,
 Terance.