Re: Executor memory requirement for reduceByKey
Ok, so that worked flawlessly after I upped the number of partitions to 400 from 40. Thanks! On Fri, May 13, 2016 at 7:28 PM, Sung Hwan Chung <coded...@cs.stanford.edu> wrote: > I'll try that, as of now I have a small number of partitions in the order > of 20~40. > > It would be great if there's some documentation on the memory requirement > wrt the number of keys and the number of partitions per executor (i.e., the > Spark's internal memory requirement outside of the user space). > > Otherwise, it's like shooting in the dark. > > On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Have you taken a look at SPARK-11293 ? >> >> Consider using repartition to increase the number of partitions. >> >> FYI >> >> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung < >> coded...@cs.stanford.edu> wrote: >> >>> Hello, >>> >>> I'm using Spark version 1.6.0 and have trouble with memory when trying >>> to do reducebykey on a dataset with as many as 75 million keys. I.e. I get >>> the following exception when I run the task. >>> >>> There are 20 workers in the cluster. It is running under the standalone >>> mode with 12 GB assigned per executor and 4 cores per worker. The >>> spark.memory.fraction is set to 0.5 and I'm not using any caching. >>> >>> What might be the problem here? Since I'm using the version 1.6.0, this >>> doesn't seem to be related to SPARK-12155. This problem always happens >>> during the shuffle read phase. >>> >>> Is there a minimum amount of memory required for executor >>> (spark.memory.fraction) for shuffle read? >>> >>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 >>> at >>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) >>> at >>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >> >
Re: Executor memory requirement for reduceByKey
I'll try that, as of now I have a small number of partitions in the order of 20~40. It would be great if there's some documentation on the memory requirement wrt the number of keys and the number of partitions per executor (i.e., the Spark's internal memory requirement outside of the user space). Otherwise, it's like shooting in the dark. On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Have you taken a look at SPARK-11293 ? > > Consider using repartition to increase the number of partitions. > > FYI > > On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung < > coded...@cs.stanford.edu> wrote: > >> Hello, >> >> I'm using Spark version 1.6.0 and have trouble with memory when trying to >> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the >> following exception when I run the task. >> >> There are 20 workers in the cluster. It is running under the standalone >> mode with 12 GB assigned per executor and 4 cores per worker. The >> spark.memory.fraction is set to 0.5 and I'm not using any caching. >> >> What might be the problem here? Since I'm using the version 1.6.0, this >> doesn't seem to be related to SPARK-12155. This problem always happens >> during the shuffle read phase. >> >> Is there a minimum amount of memory required for executor >> (spark.memory.fraction) for shuffle read? >> >> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 >> at >> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) >> at >> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >
Executor memory requirement for reduceByKey
Hello, I'm using Spark version 1.6.0 and have trouble with memory when trying to do reducebykey on a dataset with as many as 75 million keys. I.e. I get the following exception when I run the task. There are 20 workers in the cluster. It is running under the standalone mode with 12 GB assigned per executor and 4 cores per worker. The spark.memory.fraction is set to 0.5 and I'm not using any caching. What might be the problem here? Since I'm using the version 1.6.0, this doesn't seem to be related to SPARK-12155. This problem always happens during the shuffle read phase. Is there a minimum amount of memory required for executor (spark.memory.fraction) for shuffle read? java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) at org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) at org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
How Spark handles dead machines during a job.
Hello, Say, that I'm doing a simple rdd.map followed by collect. Say, also, that one of the executors finish all of its tasks, but there are still other executors running. If the machine that hosted the finished executor gets terminated, does the master still have the results from the finished tasks (and thus doesn't restart those finished tasks)? Or does the master require that all the executors be alive during the entire map-collect cycle? Thanks!
Re: Executor shutdown hooks?
What I meant is 'application'. I.e., when we manually terminate an application that was submitted via spark-submit. When we manually kill an application, it seems that individual tasks do not receive the interruptException. That interruptException seems to work iff we cancel the job through sc.cancellJob or cancelAllJobs while the application is still alive. My option so far seems to be using JVM's shutdown hook, but I was wondering if Spark itself had an API for tasks. On Wed, Apr 6, 2016 at 7:36 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > Why would the Executors shutdown when the Job is terminated? Executors > are bound to Applications, not Jobs. Furthermore, > unless spark.job.interruptOnCancel is set to true, canceling the Job at the > Application and DAGScheduler level won't actually interrupt the Tasks > running on the Executors. If you do have interruptOnCancel set, then you > can catch the interrupt exception within the Task. > > On Wed, Apr 6, 2016 at 12:24 PM, Sung Hwan Chung <coded...@gmail.com> > wrote: > >> Hi, >> >> I'm looking for ways to add shutdown hooks to executors : i.e., when a >> Job is forcefully terminated before it finishes. >> >> The scenario goes likes this : executors are running a long running job >> within a 'map' function. The user decides to terminate the job, then the >> mappers should perform some cleanups before going offline. >> >> What would be the best way to do this? >> > >
Executor shutdown hooks?
Hi, I'm looking for ways to add shutdown hooks to executors : i.e., when a Job is forcefully terminated before it finishes. The scenario goes likes this : executors are running a long running job within a 'map' function. The user decides to terminate the job, then the mappers should perform some cleanups before going offline. What would be the best way to do this?
Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh
You mean that once a job is in a waiting queue, it won't take advantage of additional workers that happened to be added after the job was put into the waiting queue? That would be less than optimal. But it would be OK with us for now as long as the additional workers will be taken advantage of by future-submitted jobs. On Mon, Mar 28, 2016 at 10:40 PM, Mich Talebzadeh <mich.talebza...@gmail.com > wrote: > The ACID test will come when you start two or more Spark processes > simultaneously. If you see queuing (i.e. second job waiting for the first > job to finish in Spark GUI) then you may not have enough resources for Yarn > to accommodate two jobs despite the additional worker process. > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 28 March 2016 at 23:30, Sung Hwan Chung <coded...@cs.stanford.edu> > wrote: > >> Yea, that seems to be the case. It seems that dynamically resizing a >> standalone Spark cluster is very simple. >> >> Thanks! >> >> On Mon, Mar 28, 2016 at 10:22 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> start-all start the master and anything else in slaves file >>> start-master.sh starts the master only. >>> >>> I use start-slaves.sh for my purpose with added nodes to slaves file. >>> >>> When you run start-slave.sh you are creating another >>> worker process on the master host. You can check the status on Spark GUI >>> on :8080. Depending the ratio of Memory/core for worker process the >>> additional worker may or may not be used. >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 28 March 2016 at 22:58, Sung Hwan Chung <coded...@cs.stanford.edu> >>> wrote: >>> >>>> It seems that the conf/slaves file is only for consumption by the >>>> following scripts: >>>> >>>> sbin/start-slaves.sh >>>> sbin/stop-slaves.sh >>>> sbin/start-all.sh >>>> sbin/stop-all.sh >>>> >>>> I.e., conf/slaves file doesn't affect a running cluster. >>>> >>>> Is this true? >>>> >>>> >>>> On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung < >>>> coded...@cs.stanford.edu> wrote: >>>> >>>>> No I didn't add it to the conf/slaves file. >>>>> >>>>> What I want to do is leverage auto-scale from AWS, without needing to >>>>> stop all the slaves (e.g. if a lot of slaves are idle, terminate those). >>>>> >>>>> Also, the book-keeping is easier if I don't have to deal with some >>>>> centralized list of slave list that needs to be modified every time a node >>>>> is added/removed. >>>>> >>>>> >>>>> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> Have you added the slave host name to $SPARK_HOME/conf? >>>>>> >>>>>> Then you can use start-slaves.sh or stop-slaves.sh for all instances >>>>>> >>>>>> The assumption is that slave boxes have $SPARK_HOME installed in the >>>>>> same directory as $SPARK_HOME is installed in the master. >>>>>> >>>>>> HTH >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I found that I could dynamically add/remove new workers to a running >>>>>>> standalone Spark cluster by simply triggering: >>>>>>> >>>>>>> start-slave.sh (SPARK_MASTER_ADDR) >>>>>>> >>>>>>> and >>>>>>> >>>>>>> stop-slave.sh >>>>>>> >>>>>>> E.g., I could instantiate a new AWS instance and just add it to a >>>>>>> running cluster without needing to add it to slaves file and restarting >>>>>>> the >>>>>>> whole cluster. >>>>>>> It seems that there's no need for me to stop a running cluster. >>>>>>> >>>>>>> Is this a valid way of dynamically resizing a spark cluster (as of >>>>>>> now, I'm not concerned about HDFS)? Or will there be certain unforeseen >>>>>>> problems if nodes are added/removed this way? >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh
Yea, that seems to be the case. It seems that dynamically resizing a standalone Spark cluster is very simple. Thanks! On Mon, Mar 28, 2016 at 10:22 PM, Mich Talebzadeh <mich.talebza...@gmail.com > wrote: > start-all start the master and anything else in slaves file > start-master.sh starts the master only. > > I use start-slaves.sh for my purpose with added nodes to slaves file. > > When you run start-slave.sh you are creating another > worker process on the master host. You can check the status on Spark GUI > on :8080. Depending the ratio of Memory/core for worker process the > additional worker may or may not be used. > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 28 March 2016 at 22:58, Sung Hwan Chung <coded...@cs.stanford.edu> > wrote: > >> It seems that the conf/slaves file is only for consumption by the >> following scripts: >> >> sbin/start-slaves.sh >> sbin/stop-slaves.sh >> sbin/start-all.sh >> sbin/stop-all.sh >> >> I.e., conf/slaves file doesn't affect a running cluster. >> >> Is this true? >> >> >> On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung < >> coded...@cs.stanford.edu> wrote: >> >>> No I didn't add it to the conf/slaves file. >>> >>> What I want to do is leverage auto-scale from AWS, without needing to >>> stop all the slaves (e.g. if a lot of slaves are idle, terminate those). >>> >>> Also, the book-keeping is easier if I don't have to deal with some >>> centralized list of slave list that needs to be modified every time a node >>> is added/removed. >>> >>> >>> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Have you added the slave host name to $SPARK_HOME/conf? >>>> >>>> Then you can use start-slaves.sh or stop-slaves.sh for all instances >>>> >>>> The assumption is that slave boxes have $SPARK_HOME installed in the >>>> same directory as $SPARK_HOME is installed in the master. >>>> >>>> HTH >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I found that I could dynamically add/remove new workers to a running >>>>> standalone Spark cluster by simply triggering: >>>>> >>>>> start-slave.sh (SPARK_MASTER_ADDR) >>>>> >>>>> and >>>>> >>>>> stop-slave.sh >>>>> >>>>> E.g., I could instantiate a new AWS instance and just add it to a >>>>> running cluster without needing to add it to slaves file and restarting >>>>> the >>>>> whole cluster. >>>>> It seems that there's no need for me to stop a running cluster. >>>>> >>>>> Is this a valid way of dynamically resizing a spark cluster (as of >>>>> now, I'm not concerned about HDFS)? Or will there be certain unforeseen >>>>> problems if nodes are added/removed this way? >>>>> >>>> >>>> >>> >> >
Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh
It seems that the conf/slaves file is only for consumption by the following scripts: sbin/start-slaves.sh sbin/stop-slaves.sh sbin/start-all.sh sbin/stop-all.sh I.e., conf/slaves file doesn't affect a running cluster. Is this true? On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung <coded...@cs.stanford.edu> wrote: > No I didn't add it to the conf/slaves file. > > What I want to do is leverage auto-scale from AWS, without needing to stop > all the slaves (e.g. if a lot of slaves are idle, terminate those). > > Also, the book-keeping is easier if I don't have to deal with some > centralized list of slave list that needs to be modified every time a node > is added/removed. > > > On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Have you added the slave host name to $SPARK_HOME/conf? >> >> Then you can use start-slaves.sh or stop-slaves.sh for all instances >> >> The assumption is that slave boxes have $SPARK_HOME installed in the same >> directory as $SPARK_HOME is installed in the master. >> >> HTH >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu> >> wrote: >> >>> Hello, >>> >>> I found that I could dynamically add/remove new workers to a running >>> standalone Spark cluster by simply triggering: >>> >>> start-slave.sh (SPARK_MASTER_ADDR) >>> >>> and >>> >>> stop-slave.sh >>> >>> E.g., I could instantiate a new AWS instance and just add it to a >>> running cluster without needing to add it to slaves file and restarting the >>> whole cluster. >>> It seems that there's no need for me to stop a running cluster. >>> >>> Is this a valid way of dynamically resizing a spark cluster (as of now, >>> I'm not concerned about HDFS)? Or will there be certain unforeseen problems >>> if nodes are added/removed this way? >>> >> >> >
Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh
No I didn't add it to the conf/slaves file. What I want to do is leverage auto-scale from AWS, without needing to stop all the slaves (e.g. if a lot of slaves are idle, terminate those). Also, the book-keeping is easier if I don't have to deal with some centralized list of slave list that needs to be modified every time a node is added/removed. On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Have you added the slave host name to $SPARK_HOME/conf? > > Then you can use start-slaves.sh or stop-slaves.sh for all instances > > The assumption is that slave boxes have $SPARK_HOME installed in the same > directory as $SPARK_HOME is installed in the master. > > HTH > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu> > wrote: > >> Hello, >> >> I found that I could dynamically add/remove new workers to a running >> standalone Spark cluster by simply triggering: >> >> start-slave.sh (SPARK_MASTER_ADDR) >> >> and >> >> stop-slave.sh >> >> E.g., I could instantiate a new AWS instance and just add it to a running >> cluster without needing to add it to slaves file and restarting the whole >> cluster. >> It seems that there's no need for me to stop a running cluster. >> >> Is this a valid way of dynamically resizing a spark cluster (as of now, >> I'm not concerned about HDFS)? Or will there be certain unforeseen problems >> if nodes are added/removed this way? >> > >
Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh
Hello, I found that I could dynamically add/remove new workers to a running standalone Spark cluster by simply triggering: start-slave.sh (SPARK_MASTER_ADDR) and stop-slave.sh E.g., I could instantiate a new AWS instance and just add it to a running cluster without needing to add it to slaves file and restarting the whole cluster. It seems that there's no need for me to stop a running cluster. Is this a valid way of dynamically resizing a spark cluster (as of now, I'm not concerned about HDFS)? Or will there be certain unforeseen problems if nodes are added/removed this way?
Parquet StringType column readable as plain-text despite being Gzipped
Hello, We are using the default compression codec for Parquet when we store our dataframes. The dataframe has a StringType column whose values can be upto several MBs large. The funny thing is that once it's stored, we can browse the file content with a plain text editor and see large portions of the string contents unencrypted. If we use the parquet-tool to browse the metadata, it says the column is GZIP and the compression ratio is 2.6x, but that just doesn't seem right. Anybody know what's going on?
Is spark-ec2 going away?
I noticed that in the main branch, the ec2 directory along with the spark-ec2 script is no longer present. Is spark-ec2 going away in the next release? If so, what would be the best alternative at that time? A couple more additional questions: 1. Is there any way to add/remove additional workers while the cluster is running without stopping/starting the EC2 cluster? 2. For 1, if no such capability is provided with the current script., do we have to write it ourselves? Or is there any plan in the future to add such functions? 2. In PySpark, is it possible to dynamically change driver/executor memory, number of cores per executor without having to restart it? (e.g. via changing sc configuration or recreating sc?) Our ideal scenario is to keep running PySpark (in our case, as a notebook) and connect/disconnect to any spark clusters on demand.
Re: Is spark-ec2 going away?
Hm thanks, I think what you are suggesting sounds like a recommendation for AWS EMR. However, my questions were wrt spark-ec2. For our uses involving spot-instances, EMR could potentially double/triple prices due to the additional premiums. Thanks anyway! On Wed, Jan 27, 2016 at 2:12 PM, Alexander Pivovarov <apivova...@gmail.com> wrote: > you can use EMR-4.3.0 run on spot instances to control the price > > yes, you can add/remove instances to the cluster on fly (CORE instances > support add only, TASK instances - add and remove) > > > > On Wed, Jan 27, 2016 at 2:07 PM, Sung Hwan Chung <coded...@cs.stanford.edu > > wrote: > >> I noticed that in the main branch, the ec2 directory along with the >> spark-ec2 script is no longer present. >> >> Is spark-ec2 going away in the next release? If so, what would be the >> best alternative at that time? >> >> A couple more additional questions: >> 1. Is there any way to add/remove additional workers while the cluster is >> running without stopping/starting the EC2 cluster? >> 2. For 1, if no such capability is provided with the current script., do >> we have to write it ourselves? Or is there any plan in the future to add >> such functions? >> 2. In PySpark, is it possible to dynamically change driver/executor >> memory, number of cores per executor without having to restart it? (e.g. >> via changing sc configuration or recreating sc?) >> >> Our ideal scenario is to keep running PySpark (in our case, as a >> notebook) and connect/disconnect to any spark clusters on demand. >> > >
Re: Is spark-ec2 going away?
Thanks! That's very helpful. On Wed, Jan 27, 2016 at 3:33 PM, Nicholas Chammas < nicholas.cham...@gmail.com> wrote: > I noticed that in the main branch, the ec2 directory along with the > spark-ec2 script is no longer present. > > It’s been moved out of the main repo to its own location: > https://github.com/amplab/spark-ec2/pull/21 > > Is spark-ec2 going away in the next release? If so, what would be the best > alternative at that time? > > It’s not going away. It’s just being removed from the main Spark repo and > maintained separately. > > There are many alternatives like EMR, which was already mentioned, as well > as more full-service solutions like Databricks. It depends on what you’re > looking for. > > If you want something as close to spark-ec2 as possible but more actively > developed, you might be interested in checking out Flintrock > <https://github.com/nchammas/flintrock>, which I built. > > Is there any way to add/remove additional workers while the cluster is > running without stopping/starting the EC2 cluster? > > Not currently possible with spark-ec2 and a bit difficult to add. See: > https://issues.apache.org/jira/browse/SPARK-2008 > > For 1, if no such capability is provided with the current script., do we > have to write it ourselves? Or is there any plan in the future to add such > functions? > > No "official" plans to add this to spark-ec2. It’s up to a contributor to > step up and implement this feature, basically. Otherwise it won’t happen. > > Nick > > On Wed, Jan 27, 2016 at 5:13 PM Alexander Pivovarov <apivova...@gmail.com> > wrote: > > you can use EMR-4.3.0 run on spot instances to control the price >> >> yes, you can add/remove instances to the cluster on fly (CORE instances >> support add only, TASK instances - add and remove) >> >> >> >> On Wed, Jan 27, 2016 at 2:07 PM, Sung Hwan Chung < >> coded...@cs.stanford.edu> wrote: >> >>> I noticed that in the main branch, the ec2 directory along with the >>> spark-ec2 script is no longer present. >>> >>> Is spark-ec2 going away in the next release? If so, what would be the >>> best alternative at that time? >>> >>> A couple more additional questions: >>> 1. Is there any way to add/remove additional workers while the cluster >>> is running without stopping/starting the EC2 cluster? >>> 2. For 1, if no such capability is provided with the current script., do >>> we have to write it ourselves? Or is there any plan in the future to add >>> such functions? >>> 2. In PySpark, is it possible to dynamically change driver/executor >>> memory, number of cores per executor without having to restart it? (e.g. >>> via changing sc configuration or recreating sc?) >>> >>> Our ideal scenario is to keep running PySpark (in our case, as a >>> notebook) and connect/disconnect to any spark clusters on demand. >>> >> >> >
Re: java.io.IOException Error in task deserialization
I haven't seen this at all since switching to HttpBroadcast. It seems TorrentBroadcast might have some issues? On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I don't think that I saw any other error message. This is all I saw. I'm currently experimenting to see if this can be alleviated by using HttpBroadcastFactory instead of TorrentBroadcast. So far, with HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you posted. On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote: This exception should be caused by another one, could you paste all of them here? Also, that will be great if you can provide a script to reproduce this problem. Thanks! On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark job (not Spark streaming) doesn't delete un-needed checkpoints.
Un-needed checkpoints are not getting automatically deleted in my application. I.e. the lineage looks something like this and checkpoints simply accumulate in a temporary directory (every lineage point, however, does zip with a globally permanent): PermanentRDD:Global zips with all the intermediate ones Intermediate RDDs: A---B---CDEF-G | | | checkpoint checkpoint checkpoint Older intermediate RDDs never get used.
Re: coalesce with shuffle or repartition is not necessarily fault-tolerant
Are there a large number of non-deterministic lineage operators? This seems like a pretty big caveat, particularly for casual programmers who expect consistent semantics between Spark and Scala. E.g., making sure that there's no randomness what-so-ever in RDD transformations seems critical. Additionally, shuffling operators would usually result in changed orders, etc. These are very easy errors to make, and if you tend to cache things, some errors won't be detected until fault-tolerance is triggered. It would be very helpful for programmers to have a big warning list of not-to-dos within RDD transformations. On Wed, Oct 8, 2014 at 11:57 PM, Sean Owen so...@cloudera.com wrote: Yes, I think this another operation that is not deterministic even for the same RDD. If a partition is lost and recalculated the ordering can be different in the partition. Sorting the RDD makes the ordering deterministic. On Thu, Oct 9, 2014 at 7:51 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Let's say you have some rows in a dataset (say X partitions initially). A B C D E . . . . You repartition to Y X, then it seems that any of the following could be valid: partition 1 partition 2 A B C E D . -- C E A B D . -- D B C E A etc. etc. I.e., although each partition will have the same unordered set, the rows' orders will change from call to call. Now, because row ordering can change from call to call, if you do any operation that depends on the order of items you saw, then lineage is no longer deterministic. For example, it seems that the repartition call itself is a row-order dependent call, because it creates a random number generator with the partition index as the seed, and then call nextInt as you go through the rows. On Wed, Oct 8, 2014 at 10:14 PM, Patrick Wendell pwend...@gmail.com wrote: IIRC - the random is seeded with the index, so it will always produce the same result for the same index. Maybe I don't totally follow though. Could you give a small example of how this might change the RDD ordering in a way that you don't expect? In general repartition() will not preserve the ordering of an RDD. On Wed, Oct 8, 2014 at 3:42 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I noticed that repartition will result in non-deterministic lineage because it'll result in changed orders for rows. So for instance, if you do things like: val data = read(...) val k = data.repartition(5) val h = k.repartition(5) It seems that this results in different ordering of rows for 'k' each time you call it. And because of this different ordering, 'h' will result in different partitions even, because 'repartition' distributes through a random number generator with the 'index' as the key.
Re: java.io.IOException Error in task deserialization
I don't think that I saw any other error message. This is all I saw. I'm currently experimenting to see if this can be alleviated by using HttpBroadcastFactory instead of TorrentBroadcast. So far, with HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you posted. On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote: This exception should be caused by another one, could you paste all of them here? Also, that will be great if you can provide a script to reproduce this problem. Thanks! On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Intermittent checkpointing failure.
I'm getting DFS closed channel exception every now and then when I run checkpoint. I do checkpointing every 15 minutes or so. This happens usually after running the job for 1~2 hours. Anyone seen this before? Job aborted due to stage failure: Task 6 in stage 70.0 failed 4 times, most recent failure: Lost task 6.3 in stage 70.0 (TID 1264, alpinenode7.alpinenow.local): java.nio.channels.ClosedChannelException: org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1526) org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98) org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) java.io.DataOutputStream.write(DataOutputStream.java:107) java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1285) java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1230) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1426) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:114) org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:95) org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:95) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
There is no circular dependency. Its simply dropping references to prev RDDs because there is no need for it. I wonder if that messes up things up though internally for Spark due to losing references to intermediate RDDs. On Oct 8, 2014, at 12:13 PM, Akshat Aranya aara...@gmail.com wrote: Using a var for RDDs in this way is not going to work. In this example, tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after that, you change what tx2 means, so you would end up having a circular dependency. On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening.
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
One thing I didn't mention is that we actually do data.repartition before hand with shuffle. I found that this can actually introduce randomness to lineage steps, because data get shuffled to different partitions and lead to inconsistent behavior if your algorithm is dependent on the order at which the data rows appear, because now data rows will appear in a different orders. If you want to guarantee fault-tolerance, you can't have any randomness whatsoever in lineage steps, and repartition violates that (depending on what you do with the data). On Wed, Oct 8, 2014 at 12:24 PM, Sung Hwan Chung coded...@gmail.com wrote: There is no circular dependency. Its simply dropping references to prev RDDs because there is no need for it. I wonder if that messes up things up though internally for Spark due to losing references to intermediate RDDs. On Oct 8, 2014, at 12:13 PM, Akshat Aranya aara...@gmail.com wrote: Using a var for RDDs in this way is not going to work. In this example, tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after that, you change what tx2 means, so you would end up having a circular dependency. On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening.
coalesce with shuffle or repartition is not necessarily fault-tolerant
I noticed that repartition will result in non-deterministic lineage because it'll result in changed orders for rows. So for instance, if you do things like: val data = read(...) val k = data.repartition(5) val h = k.repartition(5) It seems that this results in different ordering of rows for 'k' each time you call it. And because of this different ordering, 'h' will result in different partitions even, because 'repartition' distributes through a random number generator with the 'index' as the key.
Re: java.io.IOException Error in task deserialization
This is also happening to me on a regular basis, when the job is large with relatively large serialized objects used in each RDD lineage. A bad thing about this is that this exception always stops the whole job. On Fri, Sep 26, 2014 at 11:17 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: FWIW I suspect that each count operation is an opportunity for you to trigger the bug, and each filter operation increases the likelihood of setting up the bug. I normally don't come across this error until my job has been running for an hour or two and had a chance to build up longer lineages for some RDDs. It sounds like your data is a bit smaller and it's more feasible for you to build up longer lineages more quickly. If you can reduce your number of filter operations (for example by combining some into a single function) that may help. It may also help to introduce persistence or checkpointing at intermediate stages so that the length of the lineages that have to get replayed isn't as long. On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote: No for me as well it is non-deterministic. It happens in a piece of code that does many filter and counts on a small set of records (~1k-10k). The originally set is persisted in memory and we have a Kryo serializer set for it. The task itself takes in just a few filtering parameters. This with the same setting has sometimes completed to sucess and sometimes failed during this step. Arun On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Is RDD partition index consistent?
Is the RDD partition index you get when you call mapPartitionWithIndex consistent under fault-tolerance condition? I.e. 1. Say index is 1 for one of the partitions when you call data.mapPartitionWithIndex((index, rows) = ) // Say index is 1 2. The partition fails (maybe a long with a bunch of other partitions). 3. When the partitions get restarted somewhere else, will they retain the same index value, as well as all the lineage arguments?
Spark fault tolerance after a executor failure.
I sometimes see that after fully caching the data, if one of the executors fails for some reason, that portion of cache gets lost and does not get re-cached, even though there are plenty of resources. Is this a bug or a normal behavior (V1.0.1)?
Re: Getting the number of slaves
Do getExecutorStorageStatus and getExecutorMemoryStatus both return the number of executors + the driver? E.g., if I submit a job with 10 executors, I get 11 for getExeuctorStorageStatus.length and getExecutorMemoryStatus.size On Thu, Jul 24, 2014 at 4:53 PM, Nicolas Mai nicolas@gmail.com wrote: Thanks, this is what I needed :) I should have searched more... Something I noticed though: after the SparkContext is initialized, I had to wait for a few seconds until sc.getExecutorStorageStatus.length returns the correct number of workers in my cluster (otherwise it returns 1, for the driver)... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
collect on partitions get very slow near the last few partitions.
I'm doing something like this: rdd.groupBy.map().collect() The work load on final map is pretty much evenly distributed. When collect happens, say on 60 partitions, the first 55 or so partitions finish very quickly say within 10 seconds. However, the last 5, particularly the very last one, typically get very slow, the overall collect time reaching 30 seconds to sometimes even 1 minute. E.g., it would get stuck in a state like 54/55 for a much longer time. Another interesting thing is the first iteration typically doesn't have this problem, but it gets progressively worse despite having about the same workload/partition sizes in subsequent iterations. This problem worsens with smaller akka framesize and/or maxMbInFlight Anyone know why this is so?
Re: collect on partitions get very slow near the last few partitions.
I'm finding the following messages in the driver. Can this potentially have anything to do with these drastic slowdowns? 14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for shuffle 8 for deleting 14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 8 14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for shuffle 7 for deleting 14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 7 14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for shuffle 6 for deleting 14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 6 On Fri, Jun 27, 2014 at 11:35 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I'm doing something like this: rdd.groupBy.map().collect() The work load on final map is pretty much evenly distributed. When collect happens, say on 60 partitions, the first 55 or so partitions finish very quickly say within 10 seconds. However, the last 5, particularly the very last one, typically get very slow, the overall collect time reaching 30 seconds to sometimes even 1 minute. E.g., it would get stuck in a state like 54/55 for a much longer time. Another interesting thing is the first iteration typically doesn't have this problem, but it gets progressively worse despite having about the same workload/partition sizes in subsequent iterations. This problem worsens with smaller akka framesize and/or maxMbInFlight Anyone know why this is so?
Number of executors smaller than requested in YARN.
Hi, When I try requesting a large number of executors - e.g. 242, it doesn't seem to actually reach that number. E.g., under the executors tab, I only see an executor ID of upto 234. This despite the fact that there're plenty more memory available as well as CPU cores, etc in the system. In fact, in the YARN page, it shows that 243 containers are running (242 executors + driver). Anyone know what's going on?
Does Spark restart cached workers even without failures?
I'm doing coalesce with shuffle, cache and then do thousands of iterations. I noticed that sometimes Spark would for no particular reason perform partial coalesce again after running for a long time - and there was no exception or failure on the worker's part. Why is this happening?
Spark executor error
I'm seeing the following message in the log of an executor. Anyone seen this error? After this, the executor seems to lose the cache, and but besides that the whole thing slows down drastically - I.e. it gets stuck in a reduce phase for 40+ minutes, whereas before it was finishing reduces in 2~3 seconds. 14/06/25 19:22:31 WARN SendingConnection: Error writing in connection to ConnectionManagerId(alpinenode7.alpinenow.local,46251) java.lang.NullPointerException at org.apache.spark.network.MessageChunkHeader.buffer$lzycompute(MessageChunkHeader.scala:35) at org.apache.spark.network.MessageChunkHeader.buffer(MessageChunkHeader.scala:32) at org.apache.spark.network.MessageChunk.buffers$lzycompute(MessageChunk.scala:31) at org.apache.spark.network.MessageChunk.buffers(MessageChunk.scala:29) at org.apache.spark.network.SendingConnection.write(Connection.scala:349) at org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:142) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
Optimizing reduce for 'huge' aggregated outputs.
Hello, I noticed that the final reduce function happens in the driver node with a code that looks like the following. val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) { a.merge(b) } although individual outputs from mappers are small. Over time the aggregated result outputMap could be huuuge (say with hundreds of millions of keys and values, reaching giga bytes). I noticed that, even if we have a lot of memory in the driver node, this process becomes realy slow eventually (say we have 100+ partitions. the first reduce is fast, but progressively, it becomes veeery slow as more and more partition outputs get aggregated). Is this because the intermediate reduce output gets serialized and then deserialized every time? What I'd like ideally is, since reduce is taking place in the same machine any way, there's no need for any serialization and deserialization, and just aggregate the incoming results into the final aggregation. Is this possible?
Re: Optimizing reduce for 'huge' aggregated outputs.
I suppose what I want is the memory efficiency of toLocalIterator and the speed of collect. Is there any such thing? On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hello, I noticed that the final reduce function happens in the driver node with a code that looks like the following. val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) { a.merge(b) } although individual outputs from mappers are small. Over time the aggregated result outputMap could be huuuge (say with hundreds of millions of keys and values, reaching giga bytes). I noticed that, even if we have a lot of memory in the driver node, this process becomes realy slow eventually (say we have 100+ partitions. the first reduce is fast, but progressively, it becomes veeery slow as more and more partition outputs get aggregated). Is this because the intermediate reduce output gets serialized and then deserialized every time? What I'd like ideally is, since reduce is taking place in the same machine any way, there's no need for any serialization and deserialization, and just aggregate the incoming results into the final aggregation. Is this possible?
When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?
I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL. When these happen things get extremely slow. Does this mean that the executor got terminated and restarted? Is there a way to prevent this from happening (barring the machine actually going down, I'd rather stick with the same process)?
Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?
On a related note, I'd also minimize any kind of executor movement. I.e., once an executor is spawned and data cached in the executor, I want that executor to live all the way till the job is finished, or the machine fails in a fatal manner. What would be the best way to ensure that this is the case? On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL. When these happen things get extremely slow. Does this mean that the executor got terminated and restarted? Is there a way to prevent this from happening (barring the machine actually going down, I'd rather stick with the same process)?
Spark assembly error.
When I run sbt/sbt assembly, I get the following exception. Is anyone else experiencing a similar problem? .. [info] Resolving org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}assembly... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [info] Resolving org.eclipse.jetty#jetty-server;8.1.14.v20131031 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}examples... [info] Resolving com.typesafe.genjavadoc#genjavadoc-plugin_2.10.4;0.5 ... *[error] impossible to get artifacts when data has not been loaded. IvyNode = org.slf4j#slf4j-api;1.6.1* [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala:43: constructor TaskAttemptID in class TaskAttemptID is deprecated: see corresponding Javadoc for more information. [warn] new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:490: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(hadoopConfiguration) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:623: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(conf)
Re: Spark assembly error.
Nevermind, it turns out that this is a problem for the Pivotal Hadoop that we are trying to compile against. On Wed, Jun 4, 2014 at 4:16 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: When I run sbt/sbt assembly, I get the following exception. Is anyone else experiencing a similar problem? .. [info] Resolving org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}assembly... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [info] Resolving org.eclipse.jetty#jetty-server;8.1.14.v20131031 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}examples... [info] Resolving com.typesafe.genjavadoc#genjavadoc-plugin_2.10.4;0.5 ... *[error] impossible to get artifacts when data has not been loaded. IvyNode = org.slf4j#slf4j-api;1.6.1* [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala:43: constructor TaskAttemptID in class TaskAttemptID is deprecated: see corresponding Javadoc for more information. [warn] new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:490: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(hadoopConfiguration) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:623: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(conf)
Re: is it okay to reuse objects across RDD's?
Actually, I do not know how to do something like this or whether this is possible - thus my suggestive statement. Can you already declare persistent memory objects per worker? I tried something like constructing a singleton object within map functions, but that didn't work as it seemed to actually serialize singletons and pass it back and forth in a weird manner. On Mon, Apr 28, 2014 at 1:23 AM, Sean Owen so...@cloudera.com wrote: On Mon, Apr 28, 2014 at 8:22 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: e.g. something like rdd.mapPartition((rows : Iterator[String]) = { var idx = 0 rows.map((row: String) = { val valueMap = SparkWorker.getMemoryContent(valMap) val prevVal = valueMap(idx) idx += 1 ... }) ... }) The developer can implement their own fault recovery mechanism if the worker has crashed and lost the memory content. Yea you can always just declare your own per-partition data structures in a function block like that, right? valueMap can be initialized to an empty map, loaded from somewhere, or even a value that is broadcast from the driver. That's certainly better than tacking data onto RDDs. It's not restored if the computation is lost of course, but in this and many other cases, it's fine, as it is just for some cached intermediate results. This already works then or did I misunderstand the original use case?
Re: is it okay to reuse objects across RDD's?
That might be a good alternative to what we are looking for. But I wonder if this would be as efficient as we want to. For instance, will RDDs of the same size usually get partitioned to the same machines - thus not triggering any cross machine aligning, etc. We'll explore it, but I would still very much like to see more direct worker memory management besides RDDs. On Mon, Apr 28, 2014 at 10:26 AM, Tom Vacek minnesota...@gmail.com wrote: Right---They are zipped at each iteration. On Mon, Apr 28, 2014 at 11:56 AM, Chester Chen chesterxgc...@yahoo.comwrote: Tom, Are you suggesting two RDDs, one with loss and another for the rest info, using zip to tie them together, but do update on loss RDD (copy) ? Chester Sent from my iPhone On Apr 28, 2014, at 9:45 AM, Tom Vacek minnesota...@gmail.com wrote: I'm not sure what I said came through. RDD zip is not hacky at all, as it only depends on a user not changing the partitioning. Basically, you would keep your losses as an RDD[Double] and zip whose with the RDD of examples, and update the losses. You're doing a copy (and GC) on the RDD of losses each time, but this is negligible. On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Yes, this is what we've done as of now (if you read earlier threads). And we were saying that we'd prefer if Spark supported persistent worker memory management in a little bit less hacky way ;) On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote: A mutable map in an object should do what your looking for then I believe. You just reference the object as an object in your closure so it won't be swept up when your closure is serialized and you can reference variables of the object on the remote host then. e.g.: object MyObject { val mmap = scala.collection.mutable.Map[Long, Long]() } rdd.map { ele = MyObject.mmap.getOrElseUpdate(ele, 1L) ... }.map {ele = require(MyObject.mmap(ele) == 1L) }.count Along with the data loss just be careful with thread safety and multiple threads/partitions on one host so the map should be viewed as shared amongst a larger space. Also with your exact description it sounds like your data should be encoded into the RDD if its per-record/per-row: RDD[(MyBaseData, LastIterationSideValues)] On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: In our case, we'd like to keep memory content from one iteration to the next, and not just during a single mapPartition call because then we can do more efficient computations using the values from the previous iteration. So essentially, we need to declare objects outside the scope of the map/reduce calls (but residing in individual workers), then those can be accessed from the map/reduce calls. We'd be making some assumptions as you said, such as - RDD partition is statically located and can't move from worker to another worker unless the worker crashes. On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote: On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Actually, I do not know how to do something like this or whether this is possible - thus my suggestive statement. Can you already declare persistent memory objects per worker? I tried something like constructing a singleton object within map functions, but that didn't work as it seemed to actually serialize singletons and pass it back and forth in a weird manner. Does it need to be persistent across operations, or just persist for the lifetime of processing of one partition in one mapPartition? The latter is quite easy and might give most of the speedup. Maybe that's 'enough', even if it means you re-cache values several times in a repeated iterative computation. It would certainly avoid managing a lot of complexity in trying to keep that state alive remotely across operations. I'd also be interested if there is any reliable way to do that, though it seems hard since it means you embed assumptions about where particular data is going to be processed.
Re: Do developers have to be aware of Spark's fault tolerance mechanism?
I would probably agree that it's typically not a good idea to add states to distributed systems. Additionally, from a purist's perspective, this would be a bit of hacking to the paradigm. However, from a practical point of view, I think that it's a reasonable trade-off between efficiency and complexity. It's not too difficult to have a small set of mutable states being kept in-between iterations. And I think a laarge number of iterative algorithms could benefit from this. For the time being, we're thinking something like this: RDD[Array[Double]] appended with an extra column that initializes to some default value. If the extra column in an iteration has the default value, it means either something failed or it's the very first iteration, so we compute things inefficiently. Otherwise, it has intermediate computational value, so we can do efficient computation. On Mon, Apr 21, 2014 at 11:15 AM, Marcelo Vanzin van...@cloudera.comwrote: Hi Sung, On Mon, Apr 21, 2014 at 10:52 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: The goal is to keep an intermediate value per row in memory, which would allow faster subsequent computations. I.e., computeSomething would depend on the previous value from the previous computation. I think the fundamental problem here is that there is no in memory state of the sort you mention when you're talking about map/reduce-style workloads. There are three kinds of data that you can use to communicate between sub-tasks: - RDD input / output, i.e. the arguments and return values of the closures you pass to transformations - Broadcast variables - Accumulators In general, distributed algorithms should strive to be stateless, exactly because of issues like reliability and having to re-run computations (and communication/coordination in general being expensive). The last two in the list above are not generally targeted at the kind of state-keeping that you seem to be talking about. So if you make the result of computeSomething() the output of your map task, then you'll have access to it in the operations downstream from that map task. But you can't store it in a variable in memory and access it later, because that's not how the system works. In any case, I'm really not familiar with ML algorithms, but maybe you should take a look at MLLib. -- Marcelo
Re: Random Forest on Spark
Debasish, Unfortunately, we are bound to YARN, at least for the time being, because that's what most of our customers would be using (unless, all the Hadoop vendors start supporting standalone Spark - I think Cloudera might do that?). On Fri, Apr 18, 2014 at 11:12 AM, Debasish Das debasish.da...@gmail.comwrote: Spark on YARN is a big pain due to the strict memory requirement per container... If you are stress testing it, could you use a standalone cluster and see at which feature upper bound does per worker RAM requirement reaches 16 GB or more...it is possible to get 16 GB instances on EC2 these days without much trouble.,.. Another way is to run a feature selection algorithm to decrease features space before running decision tree or algorithm variants...There is a PR on entropy based feature selection algorithms...you don't want to use them to decrease features right ? A feature extraction algorithm like matrix factorization and it's variants could be used to decrease feature space as well... On Fri, Apr 18, 2014 at 10:53 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Thanks for the info on mem requirement. I think that a lot of businesses would probably prefer to use Spark on top of YARN, since that's what they invest on - a large Hadoop cluster. And the default setting for YARN seems to cap memory per container to 8 GB - so ideally, we would like to use a lot less than that (rather than telling them, nooo change your YARN settings). A convenient feature would be to automatically figure things out, and try to adapt the algorithm to memory limits (e.g., process X # of nodes at a time, instead of all the nodes at the same level). Additionally, we noticed that the default 'Double' usage for LabelPoint is very wasteful for a majority of data sets. Float would do most of times and in fact, a lot of datasets could get away with using Short or even Byte. Or in your case, since you're transforming data to Bins anyways, you could probably cache BIN IDs (for which you could use Short or Byte even)? On Fri, Apr 18, 2014 at 8:43 AM, Evan R. Sparks evan.spa...@gmail.comwrote: Interesting, and thanks for the thoughts. I think we're on the same page with 100s of millions of records. We've tested the tree implementation in mllib on 1b rows and up to 100 features - though this isn't hitting the 1000s of features you mention. Obviously multi class support isn't there yet, but I can see your point about deeper trees for many class problems. Will try them out on some image processing stuff with 1k classes we're doing in the lab once they are more developed to get a sense for where the issues are. If you're only allocating 2GB/worker you're going to have a hard time getting the real advantages of Spark. For your 1k features causing heap exceptions at depth 5 - are these categorical or continuous? The categorical vars create much smaller histograms. If you're fitting all continuous features, the memory requirements are O(b*d*2^l) where b=number of histogram bins, d=number of features, and l = level of the tree. Even accounting for object overhead, with the default number of bins, the histograms at this depth should be order of 10s of MB, not 2GB - so I'm guessing your cached data is occupying a significant chunk of that 2GB? In the tree PR - Hirakendu Das tested down to depth 10 on 500m data points with 20 continuous features and was able to run without running into memory issues (and scaling properties got better as the depth grew). His worker mem was 7.5GB and 30% of that was reserved for caching. If you wanted to go 1000 features at depth 10 I'd estimate a couple of gigs necessary for heap space for the worker to compute/store the histograms, and I guess 2x that on the master to do the reduce. Again 2GB per worker is pretty tight, because there are overheads of just starting the jvm, launching a worker, loading libraries, etc. - Evan On Apr 17, 2014, at 6:10 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Yes, it should be data specific and perhaps we're biased toward the data sets that we are playing with. To put things in perspective, we're highly interested in (and I believe, our customers are): 1. large (hundreds of millions of rows) 2. multi-class classification - nowadays, dozens of target categories are common and even thousands in some cases - you could imagine that this is a big reason for us requiring more 'complex' models 3. high dimensional with thousands of descriptive and sort-of-independent features From the theoretical perspective, I would argue that it's usually in the best interest to prune as little as possible. I believe that pruning inherently increases bias of an individual tree, which RF can't do anything about while decreasing variance - which is what RF is for. The default pruning criteria for R's reference implementation is min-node of 1 (meaning fully-grown tree
Re: Random Forest on Spark
I would argue that memory in clusters is still a limited resource and it's still beneficial to use memory as economically as possible. Let's say that you are training a gradient boosted model in Spark, which could conceivably take several hours to build hundreds to thousands of trees. You do not want to be occupying a significant portion of the cluster memory such that nobody else can run anything of significance. We have a dataset that's only ~10GB CSV in the file system, now once we cached the whole thing in Spark, it ballooned to 64 GB or so in memory and so we had to use a lot more workers with memory just so that we could cache the whole thing - this was due to the fact that although all the features were byte-sized, MLLib defaults to Double. On Fri, Apr 18, 2014 at 1:39 PM, Sandy Ryza sandy.r...@cloudera.com wrote: I don't think the YARN default of max 8GB container size is a good justification for limiting memory per worker. This is a sort of arbitrary number that came from an era where MapReduce was the main YARN application and machines generally had less memory. I expect to see this to get configured as much higher in practice on most clusters running Spark. YARN integration is actually complete in CDH5.0. We support it as well as standalone mode. On Fri, Apr 18, 2014 at 11:49 AM, Sean Owen so...@cloudera.com wrote: On Fri, Apr 18, 2014 at 7:31 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Debasish, Unfortunately, we are bound to YARN, at least for the time being, because that's what most of our customers would be using (unless, all the Hadoop vendors start supporting standalone Spark - I think Cloudera might do that?). Yes the CDH5.0.0 distro just runs Spark in stand-alone mode. Using the YARN integration is still being worked on.
Re: Random Forest on Spark
Sorry, that was incomplete information, I think Spark's compression helped (not sure how much though) that the actual memory requirement may have been smaller. On Fri, Apr 18, 2014 at 3:16 PM, Sung Hwan Chung coded...@cs.stanford.eduwrote: I would argue that memory in clusters is still a limited resource and it's still beneficial to use memory as economically as possible. Let's say that you are training a gradient boosted model in Spark, which could conceivably take several hours to build hundreds to thousands of trees. You do not want to be occupying a significant portion of the cluster memory such that nobody else can run anything of significance. We have a dataset that's only ~10GB CSV in the file system, now once we cached the whole thing in Spark, it ballooned to 64 GB or so in memory and so we had to use a lot more workers with memory just so that we could cache the whole thing - this was due to the fact that although all the features were byte-sized, MLLib defaults to Double. On Fri, Apr 18, 2014 at 1:39 PM, Sandy Ryza sandy.r...@cloudera.comwrote: I don't think the YARN default of max 8GB container size is a good justification for limiting memory per worker. This is a sort of arbitrary number that came from an era where MapReduce was the main YARN application and machines generally had less memory. I expect to see this to get configured as much higher in practice on most clusters running Spark. YARN integration is actually complete in CDH5.0. We support it as well as standalone mode. On Fri, Apr 18, 2014 at 11:49 AM, Sean Owen so...@cloudera.com wrote: On Fri, Apr 18, 2014 at 7:31 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Debasish, Unfortunately, we are bound to YARN, at least for the time being, because that's what most of our customers would be using (unless, all the Hadoop vendors start supporting standalone Spark - I think Cloudera might do that?). Yes the CDH5.0.0 distro just runs Spark in stand-alone mode. Using the YARN integration is still being worked on.
Do developers have to be aware of Spark's fault tolerance mechanism?
Are there scenarios where the developers have to be aware of how Spark's fault tolerance works to implement correct programs? It seems that if we want to maintain any sort of mutable state in each worker through iterations, it can have some unintended effect once a machine goes down. E.g., while (true) { rdd.map((row : Array[Double]) = { row[numCols - 1] = computeSomething(row) }).reduce(...) } If it fails at some point, I'd imagine that the intermediate info being stored in row[numCols - 1] will be lost. And unless Spark runs this whole thing from the very first iteration, things will get out of sync. I'd imagine that as long as we don't use mutable tricks inside of worker tasks, we should be OK, but once we start doing that, things could get ugly, unless we account for how Spark handles fault tolerance?
Re: Random Forest on Spark
Debasish, we've tested the MLLib decision tree a bit and it eats up too much memory for RF purposes. Once the tree got to depth 8~9, it was easy to get heap exception, even with 2~4 GB of memory per worker. With RF, it's very easy to get 100+ depth in RF with even only 100,000+ rows (because trees usually are not balanced). Additionally, the lack of multi-class classification limits its applicability. Also, RF requires random features per tree node to be effective (not just bootstrap samples), and MLLib decision tree doesn't support that. On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das debasish.da...@gmail.comwrote: Mllib has decision treethere is a rf pr which is not active nowtake that and swap the tree builder with the fast tree builder that's in mllib...search for the spark jira...the code is based on google planet paper. .. I am sure people in devlist are already working on it...send an email to know the status over there... There is also a rf in cloudera oryx but we could not run it on our data yet Weka 3.7.10 has a multi thread rf that is good to do some adhoc runs but it does not scale... On Apr 17, 2014 2:45 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, For one of my application, I want to use Random forests(RF) on top of spark. I see that currenlty MLLib does not have implementation for RF. What other opensource RF implementations will be great to use with spark in terms of speed? Regards, Laeeq Ahmed, KTH, Sweden.
Re: Random Forest on Spark
Evan, I actually haven't heard of 'shallow' random forest. I think that the only scenarios where shallow trees are useful are boosting scenarios. AFAIK, Random Forest is a variance reducing technique and doesn't do much about bias (although some people claim that it does have some bias reducing effect). Because shallow trees typically have higher bias than fully-grown trees, people don't often use shallow trees with RF. You can confirm this through some experiments with R's random forest implementation as well. They allow you to set some limits of depth and/or pruning. In contrast, boosting is a bias reduction technique (and increases variance), so people typically use shallow trees. Our empirical experiments also confirmed that shallow trees resulted in drastically lower accuracy for random forests. There are some papers that mix boosting-like technique with bootstrap averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could potentially use shallow trees to build boosted learners, but then average the results of many boosted learners. On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Multiclass classification, Gradient Boosting, and Random Forest support for based on the recent Decision Tree implementation in MLlib. Sung - I'd be curious to hear about your use of decision trees (and forests) where you want to go to 100+ depth. My experience with random forests has been that people typically build hundreds of shallow trees (maybe depth 7 or 8), rather than a few (or many) really deep trees. Generally speaking, we save passes over the data by computing histograms per variable per split at each *level* of a decision tree. This can blow up as the level of the decision tree gets deep, but I'd recommend a lot more memory than 2-4GB per worker for most big data workloads. On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Debasish, we've tested the MLLib decision tree a bit and it eats up too much memory for RF purposes. Once the tree got to depth 8~9, it was easy to get heap exception, even with 2~4 GB of memory per worker. With RF, it's very easy to get 100+ depth in RF with even only 100,000+ rows (because trees usually are not balanced). Additionally, the lack of multi-class classification limits its applicability. Also, RF requires random features per tree node to be effective (not just bootstrap samples), and MLLib decision tree doesn't support that. On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das debasish.da...@gmail.comwrote: Mllib has decision treethere is a rf pr which is not active nowtake that and swap the tree builder with the fast tree builder that's in mllib...search for the spark jira...the code is based on google planet paper. .. I am sure people in devlist are already working on it...send an email to know the status over there... There is also a rf in cloudera oryx but we could not run it on our data yet Weka 3.7.10 has a multi thread rf that is good to do some adhoc runs but it does not scale... On Apr 17, 2014 2:45 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, For one of my application, I want to use Random forests(RF) on top of spark. I see that currenlty MLLib does not have implementation for RF. What other opensource RF implementations will be great to use with spark in terms of speed? Regards, Laeeq Ahmed, KTH, Sweden.
Re: Random Forest on Spark
Well, if you read the original paper, http://oz.berkeley.edu/~breiman/randomforest2001.pdf Grow the tree using CART methodology to maximum size and do not prune. Now, the elements of statistical learning book on page 598 says that you could potentially overfit fully-grown regression random forest. However, this effect is very slight, and likely negligible for classifications. http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf In our experiments however, if the pruning is drastic, then the performance actually becomes much worse. This makes intuitive sense IMO because a decision tree is a non-parametric model, and the expressibility of a tree depends on the number of nodes. With a huge amount of data (millions or even billions of rows), we found that the depth of 10 is simply not adequate to build high-accuracy models. On Thu, Apr 17, 2014 at 12:30 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Hmm... can you provide some pointers to examples where deep trees are helpful? Typically with Decision Trees you limit depth (either directly or indirectly with minimum node size and minimum improvement criteria) to avoid overfitting. I agree with the assessment that forests are a variance reduction technique, but I'd be a little surprised if a bunch of hugely deep trees don't overfit to training data. I guess I view limiting tree depth as an analogue to regularization in linear models. On Thu, Apr 17, 2014 at 12:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Evan, I actually haven't heard of 'shallow' random forest. I think that the only scenarios where shallow trees are useful are boosting scenarios. AFAIK, Random Forest is a variance reducing technique and doesn't do much about bias (although some people claim that it does have some bias reducing effect). Because shallow trees typically have higher bias than fully-grown trees, people don't often use shallow trees with RF. You can confirm this through some experiments with R's random forest implementation as well. They allow you to set some limits of depth and/or pruning. In contrast, boosting is a bias reduction technique (and increases variance), so people typically use shallow trees. Our empirical experiments also confirmed that shallow trees resulted in drastically lower accuracy for random forests. There are some papers that mix boosting-like technique with bootstrap averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could potentially use shallow trees to build boosted learners, but then average the results of many boosted learners. On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Multiclass classification, Gradient Boosting, and Random Forest support for based on the recent Decision Tree implementation in MLlib. Sung - I'd be curious to hear about your use of decision trees (and forests) where you want to go to 100+ depth. My experience with random forests has been that people typically build hundreds of shallow trees (maybe depth 7 or 8), rather than a few (or many) really deep trees. Generally speaking, we save passes over the data by computing histograms per variable per split at each *level* of a decision tree. This can blow up as the level of the decision tree gets deep, but I'd recommend a lot more memory than 2-4GB per worker for most big data workloads. On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Debasish, we've tested the MLLib decision tree a bit and it eats up too much memory for RF purposes. Once the tree got to depth 8~9, it was easy to get heap exception, even with 2~4 GB of memory per worker. With RF, it's very easy to get 100+ depth in RF with even only 100,000+ rows (because trees usually are not balanced). Additionally, the lack of multi-class classification limits its applicability. Also, RF requires random features per tree node to be effective (not just bootstrap samples), and MLLib decision tree doesn't support that. On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das debasish.da...@gmail.com wrote: Mllib has decision treethere is a rf pr which is not active nowtake that and swap the tree builder with the fast tree builder that's in mllib...search for the spark jira...the code is based on google planet paper. .. I am sure people in devlist are already working on it...send an email to know the status over there... There is also a rf in cloudera oryx but we could not run it on our data yet Weka 3.7.10 has a multi thread rf that is good to do some adhoc runs but it does not scale... On Apr 17, 2014 2:45 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, For one of my application, I want to use Random forests(RF) on top of spark. I see that currenlty MLLib does not have implementation for RF. What other opensource RF implementations will be great to use with spark in terms of speed? Regards, Laeeq Ahmed, KTH, Sweden.
Re: Random Forest on Spark
Additionally, the 'random features per node' (or mtry in R) is a very important feature for Random Forest. The variance reduction comes if the trees are decorrelated from each other and often the random features per node does more than bootstrap samples. And this is something that would have to be supported at the tree level. On Thu, Apr 17, 2014 at 1:43 PM, Sung Hwan Chung coded...@cs.stanford.eduwrote: Well, if you read the original paper, http://oz.berkeley.edu/~breiman/randomforest2001.pdf Grow the tree using CART methodology to maximum size and do not prune. Now, the elements of statistical learning book on page 598 says that you could potentially overfit fully-grown regression random forest. However, this effect is very slight, and likely negligible for classifications. http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf In our experiments however, if the pruning is drastic, then the performance actually becomes much worse. This makes intuitive sense IMO because a decision tree is a non-parametric model, and the expressibility of a tree depends on the number of nodes. With a huge amount of data (millions or even billions of rows), we found that the depth of 10 is simply not adequate to build high-accuracy models. On Thu, Apr 17, 2014 at 12:30 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Hmm... can you provide some pointers to examples where deep trees are helpful? Typically with Decision Trees you limit depth (either directly or indirectly with minimum node size and minimum improvement criteria) to avoid overfitting. I agree with the assessment that forests are a variance reduction technique, but I'd be a little surprised if a bunch of hugely deep trees don't overfit to training data. I guess I view limiting tree depth as an analogue to regularization in linear models. On Thu, Apr 17, 2014 at 12:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Evan, I actually haven't heard of 'shallow' random forest. I think that the only scenarios where shallow trees are useful are boosting scenarios. AFAIK, Random Forest is a variance reducing technique and doesn't do much about bias (although some people claim that it does have some bias reducing effect). Because shallow trees typically have higher bias than fully-grown trees, people don't often use shallow trees with RF. You can confirm this through some experiments with R's random forest implementation as well. They allow you to set some limits of depth and/or pruning. In contrast, boosting is a bias reduction technique (and increases variance), so people typically use shallow trees. Our empirical experiments also confirmed that shallow trees resulted in drastically lower accuracy for random forests. There are some papers that mix boosting-like technique with bootstrap averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could potentially use shallow trees to build boosted learners, but then average the results of many boosted learners. On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Multiclass classification, Gradient Boosting, and Random Forest support for based on the recent Decision Tree implementation in MLlib. Sung - I'd be curious to hear about your use of decision trees (and forests) where you want to go to 100+ depth. My experience with random forests has been that people typically build hundreds of shallow trees (maybe depth 7 or 8), rather than a few (or many) really deep trees. Generally speaking, we save passes over the data by computing histograms per variable per split at each *level* of a decision tree. This can blow up as the level of the decision tree gets deep, but I'd recommend a lot more memory than 2-4GB per worker for most big data workloads. On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Debasish, we've tested the MLLib decision tree a bit and it eats up too much memory for RF purposes. Once the tree got to depth 8~9, it was easy to get heap exception, even with 2~4 GB of memory per worker. With RF, it's very easy to get 100+ depth in RF with even only 100,000+ rows (because trees usually are not balanced). Additionally, the lack of multi-class classification limits its applicability. Also, RF requires random features per tree node to be effective (not just bootstrap samples), and MLLib decision tree doesn't support that. On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das debasish.da...@gmail.com wrote: Mllib has decision treethere is a rf pr which is not active nowtake that and swap the tree builder with the fast tree builder that's in mllib...search for the spark jira...the code is based on google planet paper. .. I am sure people in devlist are already working on it...send an email to know the status over there... There is also a rf in cloudera oryx but we could not run it on our data yet Weka 3.7.10 has
Re: Random Forest on Spark
I believe that they show one example comparing depth 1 ensemble vs depth 3 ensemble but it is based on boosting, not bagging. On Thu, Apr 17, 2014 at 2:21 PM, Debasish Das debasish.da...@gmail.comwrote: Evan, Was not mllib decision tree implemented using ideas from Google's PLANET paper...do the paper also propose to grow a shallow tree ? Thanks. Deb On Thu, Apr 17, 2014 at 1:52 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Additionally, the 'random features per node' (or mtry in R) is a very important feature for Random Forest. The variance reduction comes if the trees are decorrelated from each other and often the random features per node does more than bootstrap samples. And this is something that would have to be supported at the tree level. On Thu, Apr 17, 2014 at 1:43 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Well, if you read the original paper, http://oz.berkeley.edu/~breiman/randomforest2001.pdf Grow the tree using CART methodology to maximum size and do not prune. Now, the elements of statistical learning book on page 598 says that you could potentially overfit fully-grown regression random forest. However, this effect is very slight, and likely negligible for classifications. http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf In our experiments however, if the pruning is drastic, then the performance actually becomes much worse. This makes intuitive sense IMO because a decision tree is a non-parametric model, and the expressibility of a tree depends on the number of nodes. With a huge amount of data (millions or even billions of rows), we found that the depth of 10 is simply not adequate to build high-accuracy models. On Thu, Apr 17, 2014 at 12:30 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Hmm... can you provide some pointers to examples where deep trees are helpful? Typically with Decision Trees you limit depth (either directly or indirectly with minimum node size and minimum improvement criteria) to avoid overfitting. I agree with the assessment that forests are a variance reduction technique, but I'd be a little surprised if a bunch of hugely deep trees don't overfit to training data. I guess I view limiting tree depth as an analogue to regularization in linear models. On Thu, Apr 17, 2014 at 12:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Evan, I actually haven't heard of 'shallow' random forest. I think that the only scenarios where shallow trees are useful are boosting scenarios. AFAIK, Random Forest is a variance reducing technique and doesn't do much about bias (although some people claim that it does have some bias reducing effect). Because shallow trees typically have higher bias than fully-grown trees, people don't often use shallow trees with RF. You can confirm this through some experiments with R's random forest implementation as well. They allow you to set some limits of depth and/or pruning. In contrast, boosting is a bias reduction technique (and increases variance), so people typically use shallow trees. Our empirical experiments also confirmed that shallow trees resulted in drastically lower accuracy for random forests. There are some papers that mix boosting-like technique with bootstrap averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could potentially use shallow trees to build boosted learners, but then average the results of many boosted learners. On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks evan.spa...@gmail.com wrote: Multiclass classification, Gradient Boosting, and Random Forest support for based on the recent Decision Tree implementation in MLlib. Sung - I'd be curious to hear about your use of decision trees (and forests) where you want to go to 100+ depth. My experience with random forests has been that people typically build hundreds of shallow trees (maybe depth 7 or 8), rather than a few (or many) really deep trees. Generally speaking, we save passes over the data by computing histograms per variable per split at each *level* of a decision tree. This can blow up as the level of the decision tree gets deep, but I'd recommend a lot more memory than 2-4GB per worker for most big data workloads. On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Debasish, we've tested the MLLib decision tree a bit and it eats up too much memory for RF purposes. Once the tree got to depth 8~9, it was easy to get heap exception, even with 2~4 GB of memory per worker. With RF, it's very easy to get 100+ depth in RF with even only 100,000+ rows (because trees usually are not balanced). Additionally, the lack of multi-class classification limits its applicability. Also, RF requires random features per tree node to be effective (not just bootstrap samples), and MLLib decision tree doesn't support that. On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das
Re: Random Forest on Spark
Yes, it should be data specific and perhaps we're biased toward the data sets that we are playing with. To put things in perspective, we're highly interested in (and I believe, our customers are): 1. large (hundreds of millions of rows) 2. multi-class classification - nowadays, dozens of target categories are common and even thousands in some cases - you could imagine that this is a big reason for us requiring more 'complex' models 3. high dimensional with thousands of descriptive and sort-of-independent features From the theoretical perspective, I would argue that it's usually in the best interest to prune as little as possible. I believe that pruning inherently increases bias of an individual tree, which RF can't do anything about while decreasing variance - which is what RF is for. The default pruning criteria for R's reference implementation is min-node of 1 (meaning fully-grown tree) for classification, and 5 for regression. I'd imagine they did at least some empirical testing to justify these values at the time - although at a time of small datasets :). FYI, we are also considering the MLLib decision tree for our Gradient Boosting implementation, however, the memory requirement is still a bit too steep (we were getting heap exceptions at depth limit of 5 with 2GB per worker with approximately 1000 features). Now 2GB per worker is about what we expect our typical customers would tolerate and I don't think that it's unreasonable for shallow trees. On Thu, Apr 17, 2014 at 3:54 PM, Evan R. Sparks evan.spa...@gmail.comwrote: What kind of data are you training on? These effects are *highly* data dependent, and while saying the depth of 10 is simply not adequate to build high-accuracy models may be accurate for the particular problem you're modeling, it is not true in general. From a statistical perspective, I consider each node in each tree an additional degree of freedom for the model, and all else equal I'd expect a model with fewer degrees of freedom to generalize better. Regardless, if there are lots of use cases for really deep trees, we'd like to hear about them so that we can decide how important they are to support! In the context of CART - pruning very specifically refers to a step *after* a tree has been constructed to some depth using cross-validation. This was a variance reduction technique in the original tree work that is unnecessary and computationally expensive in the context of forests. In the original Random Forests paper, there are still stopping criteria - usually either minimum leaf size or minimum split improvement (or both), so training to maximum depth doesn't mean train until you've completely divided your dataset and there's one point per leaf. My point is that if you set minimum leaf size to something like 0.2% of the dataset, then you're not going to get deeper than 10 or 12 levels with a reasonably balanced tree. With respect to PLANET - our implementation is very much in the spirit of planet, but has some key differences - there's good documentation on exactly what the differences are forthcoming, so I won't belabor these here. The differences are designed to 1) avoid data shuffling, and 2) minimize number of passes over the training data. Of course, there are tradeoffs involved, and there is at least one really good trick in the PLANET work that we should leverage that we aren't yet - namely once the nodes get small enough for data to fit easily on a single machine, data can be shuffled and then the remainder of the tree can be trained in parallel from each lower node on a single machine This would actually help with the memory overheads in model training when trees get deep - if someone wants to modify the current implementation of trees in MLlib and contribute this optimization as a pull request, it would be welcome! At any rate, we'll take this feedback into account with respect to improving the tree implementation, but if anyone can send over use cases or (even better) datasets where really deep trees are necessary, that would be great! On Thu, Apr 17, 2014 at 1:43 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Well, if you read the original paper, http://oz.berkeley.edu/~breiman/randomforest2001.pdf Grow the tree using CART methodology to maximum size and do not prune. Now, the elements of statistical learning book on page 598 says that you could potentially overfit fully-grown regression random forest. However, this effect is very slight, and likely negligible for classifications. http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf In our experiments however, if the pruning is drastic, then the performance actually becomes much worse. This makes intuitive sense IMO because a decision tree is a non-parametric model, and the expressibility of a tree depends on the number of nodes. With a huge amount of data (millions or even billions of rows), we found that the depth of 10
Mutable tagging RDD rows ?
Hey guys, I need to tag individual RDD lines with some values. This tag value would change at every iteration. Is this possible with RDD (I suppose this is sort of like mutable RDD, but it's more) ? If not, what would be the best way to do something like this? Basically, we need to keep mutable information per data row (this would be something much smaller than actual data row, however). Thanks
Re: YARN problem using an external jar in worker nodes Inbox x
Yea it's in a standalone mode and I did use SparkContext.addJar method and tried setting setExecutorEnv SPARK_CLASSPATH, etc. but none of it worked. I finally made it work by modifying the ClientBase.scala code where I set 'appMasterOnly' to false before the addJars contents were added to distCacheMgr. But this is not what I should be doing, right? Is there a problem with addJar method in 0.9.0? On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Sung, Are you using yarn-standalone mode? Have you specified the --addJars option with your external jars? -Sandy On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hello, (this is Yarn related) I'm able to load an external jar and use its classes within ApplicationMaster. I wish to use this jar within worker nodes, so I added sc.addJar(pathToJar) and ran. I get the following exception: org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) akka.actor.ActorCell.invoke(ActorCell.scala:456) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) akka.dispatch.Mailbox.run(Mailbox.scala:219) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) And in worker node containers' stderr log (nothing in stdout log), I don't see any reference to loading jars: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/03/26 13:12:18 INFO Remoting: Starting remoting 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] - [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! Shutting down. Any idea what's going on?
Re: YARN problem using an external jar in worker nodes Inbox x
Well, it says that the jar was successfully added but can't reference classes from it. Does this have anything to do with this bug? http://stackoverflow.com/questions/22457645/when-to-use-spark-classpath-or-sparkcontext-addjar On Thu, Mar 27, 2014 at 2:57 PM, Sandy Ryza sandy.r...@cloudera.com wrote: I just tried this in CDH (only a few patches ahead of 0.9.0) and was able to include a dependency with --addJars successfully. Can you share how you're invoking SparkContext.addJar? Anything interesting in the application master logs? -Sandy On Thu, Mar 27, 2014 at 11:35 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Yea it's in a standalone mode and I did use SparkContext.addJar method and tried setting setExecutorEnv SPARK_CLASSPATH, etc. but none of it worked. I finally made it work by modifying the ClientBase.scala code where I set 'appMasterOnly' to false before the addJars contents were added to distCacheMgr. But this is not what I should be doing, right? Is there a problem with addJar method in 0.9.0? On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza sandy.r...@cloudera.comwrote: Hi Sung, Are you using yarn-standalone mode? Have you specified the --addJars option with your external jars? -Sandy On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hello, (this is Yarn related) I'm able to load an external jar and use its classes within ApplicationMaster. I wish to use this jar within worker nodes, so I added sc.addJar(pathToJar) and ran. I get the following exception: org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) akka.actor.ActorCell.invoke(ActorCell.scala:456) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) akka.dispatch.Mailbox.run(Mailbox.scala:219) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) And in worker node containers' stderr log (nothing in stdout log), I don't see any reference to loading jars: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/03/26 13:12:18 INFO Remoting: Starting remoting 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] - [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! Shutting down
YARN problem using an external jar in worker nodes Inbox x
Hello, (this is Yarn related) I'm able to load an external jar and use its classes within ApplicationMaster. I wish to use this jar within worker nodes, so I added sc.addJar(pathToJar) and ran. I get the following exception: org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) akka.actor.ActorCell.invoke(ActorCell.scala:456) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) akka.dispatch.Mailbox.run(Mailbox.scala:219) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) And in worker node containers' stderr log (nothing in stdout log), I don't see any reference to loading jars: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/03/26 13:12:18 INFO Remoting: Starting remoting 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] - [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! Shutting down. Any idea what's going on?