Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
Ok, so that worked flawlessly after I upped the number of partitions to 400
from 40.

Thanks!

On Fri, May 13, 2016 at 7:28 PM, Sung Hwan Chung <coded...@cs.stanford.edu>
wrote:

> I'll try that, as of now I have a small number of partitions in the order
> of 20~40.
>
> It would be great if there's some documentation on the memory requirement
> wrt the number of keys and the number of partitions per executor (i.e., the
> Spark's internal memory requirement outside of the user space).
>
> Otherwise, it's like shooting in the dark.
>
> On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Have you taken a look at SPARK-11293 ?
>>
>> Consider using repartition to increase the number of partitions.
>>
>> FYI
>>
>> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> Hello,
>>>
>>> I'm using Spark version 1.6.0 and have trouble with memory when trying
>>> to do reducebykey on a dataset with as many as 75 million keys. I.e. I get
>>> the following exception when I run the task.
>>>
>>> There are 20 workers in the cluster. It is running under the standalone
>>> mode with 12 GB assigned per executor and 4 cores per worker. The
>>> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>>>
>>> What might be the problem here? Since I'm using the version 1.6.0, this
>>> doesn't seem to be related to  SPARK-12155. This problem always happens
>>> during the shuffle read phase.
>>>
>>> Is there a minimum  amount of memory required for executor
>>> (spark.memory.fraction) for shuffle read?
>>>
>>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>>> at 
>>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>>> at 
>>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>
>


Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
I'll try that, as of now I have a small number of partitions in the order
of 20~40.

It would be great if there's some documentation on the memory requirement
wrt the number of keys and the number of partitions per executor (i.e., the
Spark's internal memory requirement outside of the user space).

Otherwise, it's like shooting in the dark.

On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you taken a look at SPARK-11293 ?
>
> Consider using repartition to increase the number of partitions.
>
> FYI
>
> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <
> coded...@cs.stanford.edu> wrote:
>
>> Hello,
>>
>> I'm using Spark version 1.6.0 and have trouble with memory when trying to
>> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
>> following exception when I run the task.
>>
>> There are 20 workers in the cluster. It is running under the standalone
>> mode with 12 GB assigned per executor and 4 cores per worker. The
>> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>>
>> What might be the problem here? Since I'm using the version 1.6.0, this
>> doesn't seem to be related to  SPARK-12155. This problem always happens
>> during the shuffle read phase.
>>
>> Is there a minimum  amount of memory required for executor
>> (spark.memory.fraction) for shuffle read?
>>
>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>>  at 
>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>>  at 
>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>


Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
Hello,

I'm using Spark version 1.6.0 and have trouble with memory when trying to
do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
following exception when I run the task.

There are 20 workers in the cluster. It is running under the standalone
mode with 12 GB assigned per executor and 4 cores per worker. The
spark.memory.fraction is set to 0.5 and I'm not using any caching.

What might be the problem here? Since I'm using the version 1.6.0, this
doesn't seem to be related to  SPARK-12155. This problem always happens
during the shuffle read phase.

Is there a minimum  amount of memory required for executor
(spark.memory.fraction) for shuffle read?

java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
at 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


How Spark handles dead machines during a job.

2016-04-08 Thread Sung Hwan Chung
Hello,

Say, that I'm doing a simple rdd.map followed by collect. Say, also, that
one of the executors finish all of its tasks, but there are still other
executors running.

If the machine that hosted the finished executor gets terminated, does the
master still have the results from the finished tasks (and thus doesn't
restart those finished tasks)?

Or does the master require that all the executors be alive during the
entire map-collect cycle?

Thanks!


Re: Executor shutdown hooks?

2016-04-06 Thread Sung Hwan Chung
What I meant is 'application'. I.e., when we manually terminate an
application that was submitted via spark-submit.
When we manually kill an application, it seems that individual tasks do not
receive the interruptException.

That interruptException seems to work iff we cancel the job through
sc.cancellJob or cancelAllJobs while the application is still alive.

My option so far seems to be using JVM's shutdown hook, but I was wondering
if Spark itself had an API for tasks.

On Wed, Apr 6, 2016 at 7:36 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> Why would the Executors shutdown when the Job is terminated?  Executors
> are bound to Applications, not Jobs.  Furthermore,
> unless spark.job.interruptOnCancel is set to true, canceling the Job at the
> Application and DAGScheduler level won't actually interrupt the Tasks
> running on the Executors.  If you do have interruptOnCancel set, then you
> can catch the interrupt exception within the Task.
>
> On Wed, Apr 6, 2016 at 12:24 PM, Sung Hwan Chung <coded...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm looking for ways to add shutdown hooks to executors : i.e., when a
>> Job is forcefully terminated before it finishes.
>>
>> The scenario goes likes this : executors are running a long running job
>> within a 'map' function. The user decides to terminate the job, then the
>> mappers should perform some cleanups before going offline.
>>
>> What would be the best way to do this?
>>
>
>


Executor shutdown hooks?

2016-04-06 Thread Sung Hwan Chung
Hi,

I'm looking for ways to add shutdown hooks to executors : i.e., when a Job
is forcefully terminated before it finishes.

The scenario goes likes this : executors are running a long running job
within a 'map' function. The user decides to terminate the job, then the
mappers should perform some cleanups before going offline.

What would be the best way to do this?


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
You mean that once a job is in a waiting queue, it won't take advantage of
additional workers that happened to be added after the job was put into the
waiting queue?

That would be less than optimal. But it would be OK with us for now as long
as the additional workers will be taken advantage of by future-submitted
jobs.

On Mon, Mar 28, 2016 at 10:40 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> The ACID test will come when you start two or more Spark processes
> simultaneously. If you see queuing (i.e. second job waiting for the first
> job to finish in Spark GUI) then you may not have enough resources for Yarn
> to accommodate two jobs despite the additional worker process.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 23:30, Sung Hwan Chung <coded...@cs.stanford.edu>
> wrote:
>
>> Yea, that seems to be the case. It seems that dynamically resizing a
>> standalone Spark cluster is very simple.
>>
>> Thanks!
>>
>> On Mon, Mar 28, 2016 at 10:22 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> start-all start the master and anything else in slaves file
>>> start-master.sh starts the master only.
>>>
>>> I use start-slaves.sh for my purpose with added nodes to slaves file.
>>>
>>> When you run start-slave.sh  you are creating another
>>> worker  process on the master host. You can check the status on Spark GUI
>>> on :8080. Depending the ratio of Memory/core for worker process the
>>> additional worker may or may not be used.
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 28 March 2016 at 22:58, Sung Hwan Chung <coded...@cs.stanford.edu>
>>> wrote:
>>>
>>>> It seems that the conf/slaves file is only for consumption by the
>>>> following scripts:
>>>>
>>>> sbin/start-slaves.sh
>>>> sbin/stop-slaves.sh
>>>> sbin/start-all.sh
>>>> sbin/stop-all.sh
>>>>
>>>> I.e., conf/slaves file doesn't affect a running cluster.
>>>>
>>>> Is this true?
>>>>
>>>>
>>>> On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung <
>>>> coded...@cs.stanford.edu> wrote:
>>>>
>>>>> No I didn't add it to the conf/slaves file.
>>>>>
>>>>> What I want to do is leverage auto-scale from AWS, without needing to
>>>>> stop all the slaves (e.g. if a lot of slaves are idle, terminate those).
>>>>>
>>>>> Also, the book-keeping is easier if I don't have to deal with some
>>>>> centralized list of slave list that needs to be modified every time a node
>>>>> is added/removed.
>>>>>
>>>>>
>>>>> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Have you added the slave host name to $SPARK_HOME/conf?
>>>>>>
>>>>>> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>>>>>>
>>>>>> The assumption is that slave boxes have $SPARK_HOME installed in the
>>>>>> same directory as $SPARK_HOME is installed in the master.
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I found that I could dynamically add/remove new workers to a running
>>>>>>> standalone Spark cluster by simply triggering:
>>>>>>>
>>>>>>> start-slave.sh (SPARK_MASTER_ADDR)
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> stop-slave.sh
>>>>>>>
>>>>>>> E.g., I could instantiate a new AWS instance and just add it to a
>>>>>>> running cluster without needing to add it to slaves file and restarting 
>>>>>>> the
>>>>>>> whole cluster.
>>>>>>> It seems that there's no need for me to stop a running cluster.
>>>>>>>
>>>>>>> Is this a valid way of dynamically resizing a spark cluster (as of
>>>>>>> now, I'm not concerned about HDFS)? Or will there be certain unforeseen
>>>>>>> problems if nodes are added/removed this way?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
Yea, that seems to be the case. It seems that dynamically resizing a
standalone Spark cluster is very simple.

Thanks!

On Mon, Mar 28, 2016 at 10:22 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> start-all start the master and anything else in slaves file
> start-master.sh starts the master only.
>
> I use start-slaves.sh for my purpose with added nodes to slaves file.
>
> When you run start-slave.sh  you are creating another
> worker  process on the master host. You can check the status on Spark GUI
> on :8080. Depending the ratio of Memory/core for worker process the
> additional worker may or may not be used.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 22:58, Sung Hwan Chung <coded...@cs.stanford.edu>
> wrote:
>
>> It seems that the conf/slaves file is only for consumption by the
>> following scripts:
>>
>> sbin/start-slaves.sh
>> sbin/stop-slaves.sh
>> sbin/start-all.sh
>> sbin/stop-all.sh
>>
>> I.e., conf/slaves file doesn't affect a running cluster.
>>
>> Is this true?
>>
>>
>> On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> No I didn't add it to the conf/slaves file.
>>>
>>> What I want to do is leverage auto-scale from AWS, without needing to
>>> stop all the slaves (e.g. if a lot of slaves are idle, terminate those).
>>>
>>> Also, the book-keeping is easier if I don't have to deal with some
>>> centralized list of slave list that needs to be modified every time a node
>>> is added/removed.
>>>
>>>
>>> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Have you added the slave host name to $SPARK_HOME/conf?
>>>>
>>>> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>>>>
>>>> The assumption is that slave boxes have $SPARK_HOME installed in the
>>>> same directory as $SPARK_HOME is installed in the master.
>>>>
>>>> HTH
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I found that I could dynamically add/remove new workers to a running
>>>>> standalone Spark cluster by simply triggering:
>>>>>
>>>>> start-slave.sh (SPARK_MASTER_ADDR)
>>>>>
>>>>> and
>>>>>
>>>>> stop-slave.sh
>>>>>
>>>>> E.g., I could instantiate a new AWS instance and just add it to a
>>>>> running cluster without needing to add it to slaves file and restarting 
>>>>> the
>>>>> whole cluster.
>>>>> It seems that there's no need for me to stop a running cluster.
>>>>>
>>>>> Is this a valid way of dynamically resizing a spark cluster (as of
>>>>> now, I'm not concerned about HDFS)? Or will there be certain unforeseen
>>>>> problems if nodes are added/removed this way?
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
It seems that the conf/slaves file is only for consumption by the following
scripts:

sbin/start-slaves.sh
sbin/stop-slaves.sh
sbin/start-all.sh
sbin/stop-all.sh

I.e., conf/slaves file doesn't affect a running cluster.

Is this true?


On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung <coded...@cs.stanford.edu>
wrote:

> No I didn't add it to the conf/slaves file.
>
> What I want to do is leverage auto-scale from AWS, without needing to stop
> all the slaves (e.g. if a lot of slaves are idle, terminate those).
>
> Also, the book-keeping is easier if I don't have to deal with some
> centralized list of slave list that needs to be modified every time a node
> is added/removed.
>
>
> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Have you added the slave host name to $SPARK_HOME/conf?
>>
>> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>>
>> The assumption is that slave boxes have $SPARK_HOME installed in the same
>> directory as $SPARK_HOME is installed in the master.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu>
>> wrote:
>>
>>> Hello,
>>>
>>> I found that I could dynamically add/remove new workers to a running
>>> standalone Spark cluster by simply triggering:
>>>
>>> start-slave.sh (SPARK_MASTER_ADDR)
>>>
>>> and
>>>
>>> stop-slave.sh
>>>
>>> E.g., I could instantiate a new AWS instance and just add it to a
>>> running cluster without needing to add it to slaves file and restarting the
>>> whole cluster.
>>> It seems that there's no need for me to stop a running cluster.
>>>
>>> Is this a valid way of dynamically resizing a spark cluster (as of now,
>>> I'm not concerned about HDFS)? Or will there be certain unforeseen problems
>>> if nodes are added/removed this way?
>>>
>>
>>
>


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
No I didn't add it to the conf/slaves file.

What I want to do is leverage auto-scale from AWS, without needing to stop
all the slaves (e.g. if a lot of slaves are idle, terminate those).

Also, the book-keeping is easier if I don't have to deal with some
centralized list of slave list that needs to be modified every time a node
is added/removed.


On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Have you added the slave host name to $SPARK_HOME/conf?
>
> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>
> The assumption is that slave boxes have $SPARK_HOME installed in the same
> directory as $SPARK_HOME is installed in the master.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 22:06, Sung Hwan Chung <coded...@cs.stanford.edu>
> wrote:
>
>> Hello,
>>
>> I found that I could dynamically add/remove new workers to a running
>> standalone Spark cluster by simply triggering:
>>
>> start-slave.sh (SPARK_MASTER_ADDR)
>>
>> and
>>
>> stop-slave.sh
>>
>> E.g., I could instantiate a new AWS instance and just add it to a running
>> cluster without needing to add it to slaves file and restarting the whole
>> cluster.
>> It seems that there's no need for me to stop a running cluster.
>>
>> Is this a valid way of dynamically resizing a spark cluster (as of now,
>> I'm not concerned about HDFS)? Or will there be certain unforeseen problems
>> if nodes are added/removed this way?
>>
>
>


Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
Hello,

I found that I could dynamically add/remove new workers to a running
standalone Spark cluster by simply triggering:

start-slave.sh (SPARK_MASTER_ADDR)

and

stop-slave.sh

E.g., I could instantiate a new AWS instance and just add it to a running
cluster without needing to add it to slaves file and restarting the whole
cluster.
It seems that there's no need for me to stop a running cluster.

Is this a valid way of dynamically resizing a spark cluster (as of now, I'm
not concerned about HDFS)? Or will there be certain unforeseen problems if
nodes are added/removed this way?


Parquet StringType column readable as plain-text despite being Gzipped

2016-02-03 Thread Sung Hwan Chung
Hello,

We are using the default compression codec for Parquet when we store our
dataframes. The dataframe has a StringType column whose values can be upto
several MBs large.

The funny thing is that once it's stored, we can browse the file content
with a plain text editor and see large portions of the string contents
unencrypted.

If we use the parquet-tool to browse the metadata, it says the column is
GZIP and the compression ratio is 2.6x, but that just doesn't seem right.

Anybody know what's going on?


Is spark-ec2 going away?

2016-01-27 Thread Sung Hwan Chung
I noticed that in the main branch, the ec2 directory along with the
spark-ec2 script is no longer present.

Is spark-ec2 going away in the next release? If so, what would be the best
alternative at that time?

A couple more additional questions:
1. Is there any way to add/remove additional workers while the cluster is
running without stopping/starting the EC2 cluster?
2. For 1, if no such capability is provided with the current script., do we
have to write it ourselves? Or is there any plan in the future to add such
functions?
2. In PySpark, is it possible to dynamically change driver/executor memory,
number of cores per executor without having to restart it? (e.g. via
changing sc configuration or recreating sc?)

Our ideal scenario is to keep running PySpark (in our case, as a notebook)
and connect/disconnect to any spark clusters on demand.


Re: Is spark-ec2 going away?

2016-01-27 Thread Sung Hwan Chung
Hm thanks,

I think what you are suggesting sounds like a recommendation for AWS EMR.
However, my questions were wrt spark-ec2. For our uses involving
spot-instances, EMR could potentially double/triple prices due to the
additional premiums.

Thanks anyway!

On Wed, Jan 27, 2016 at 2:12 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> you can use EMR-4.3.0 run on spot instances to control the price
>
> yes, you can add/remove instances to the cluster on fly  (CORE instances
> support add only, TASK instances - add and remove)
>
>
>
> On Wed, Jan 27, 2016 at 2:07 PM, Sung Hwan Chung <coded...@cs.stanford.edu
> > wrote:
>
>> I noticed that in the main branch, the ec2 directory along with the
>> spark-ec2 script is no longer present.
>>
>> Is spark-ec2 going away in the next release? If so, what would be the
>> best alternative at that time?
>>
>> A couple more additional questions:
>> 1. Is there any way to add/remove additional workers while the cluster is
>> running without stopping/starting the EC2 cluster?
>> 2. For 1, if no such capability is provided with the current script., do
>> we have to write it ourselves? Or is there any plan in the future to add
>> such functions?
>> 2. In PySpark, is it possible to dynamically change driver/executor
>> memory, number of cores per executor without having to restart it? (e.g.
>> via changing sc configuration or recreating sc?)
>>
>> Our ideal scenario is to keep running PySpark (in our case, as a
>> notebook) and connect/disconnect to any spark clusters on demand.
>>
>
>


Re: Is spark-ec2 going away?

2016-01-27 Thread Sung Hwan Chung
Thanks! That's very helpful.

On Wed, Jan 27, 2016 at 3:33 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> I noticed that in the main branch, the ec2 directory along with the
> spark-ec2 script is no longer present.
>
> It’s been moved out of the main repo to its own location:
> https://github.com/amplab/spark-ec2/pull/21
>
> Is spark-ec2 going away in the next release? If so, what would be the best
> alternative at that time?
>
> It’s not going away. It’s just being removed from the main Spark repo and
> maintained separately.
>
> There are many alternatives like EMR, which was already mentioned, as well
> as more full-service solutions like Databricks. It depends on what you’re
> looking for.
>
> If you want something as close to spark-ec2 as possible but more actively
> developed, you might be interested in checking out Flintrock
> <https://github.com/nchammas/flintrock>, which I built.
>
> Is there any way to add/remove additional workers while the cluster is
> running without stopping/starting the EC2 cluster?
>
> Not currently possible with spark-ec2 and a bit difficult to add. See:
> https://issues.apache.org/jira/browse/SPARK-2008
>
> For 1, if no such capability is provided with the current script., do we
> have to write it ourselves? Or is there any plan in the future to add such
> functions?
>
> No "official" plans to add this to spark-ec2. It’s up to a contributor to
> step up and implement this feature, basically. Otherwise it won’t happen.
>
> Nick
>
> On Wed, Jan 27, 2016 at 5:13 PM Alexander Pivovarov <apivova...@gmail.com>
> wrote:
>
> you can use EMR-4.3.0 run on spot instances to control the price
>>
>> yes, you can add/remove instances to the cluster on fly  (CORE instances
>> support add only, TASK instances - add and remove)
>>
>>
>>
>> On Wed, Jan 27, 2016 at 2:07 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> I noticed that in the main branch, the ec2 directory along with the
>>> spark-ec2 script is no longer present.
>>>
>>> Is spark-ec2 going away in the next release? If so, what would be the
>>> best alternative at that time?
>>>
>>> A couple more additional questions:
>>> 1. Is there any way to add/remove additional workers while the cluster
>>> is running without stopping/starting the EC2 cluster?
>>> 2. For 1, if no such capability is provided with the current script., do
>>> we have to write it ourselves? Or is there any plan in the future to add
>>> such functions?
>>> 2. In PySpark, is it possible to dynamically change driver/executor
>>> memory, number of cores per executor without having to restart it? (e.g.
>>> via changing sc configuration or recreating sc?)
>>>
>>> Our ideal scenario is to keep running PySpark (in our case, as a
>>> notebook) and connect/disconnect to any spark clusters on demand.
>>>
>>
>> ​
>


Re: java.io.IOException Error in task deserialization

2014-10-10 Thread Sung Hwan Chung
I haven't seen this at all since switching to HttpBroadcast. It seems
TorrentBroadcast might have some issues?

On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I don't think that I saw any other error message. This is all I saw.

 I'm currently experimenting to see if this can be alleviated by using
 HttpBroadcastFactory instead of TorrentBroadcast. So far, with
 HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you
 posted.

 On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote:

 This exception should be caused by another one, could you paste all of
 them here?

 Also, that will be great if you can provide a script to reproduce this
 problem.

 Thanks!

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:
  Has anyone else seen this erorr in task deserialization?  The task is
  processing a small amount of data and doesn't seem to have much data
 hanging
  to the closure?  I've only seen this with Spark 1.1
 
  Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times,
 most
  recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com):
  java.io.IOException: unexpected exception type
 
 
 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744)

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Spark job (not Spark streaming) doesn't delete un-needed checkpoints.

2014-10-10 Thread Sung Hwan Chung
Un-needed checkpoints are not getting automatically deleted in my
application.

I.e. the lineage looks something like this and checkpoints simply
accumulate in a temporary directory (every lineage point, however, does zip
with a globally permanent):

PermanentRDD:Global zips with all the intermediate ones

Intermediate RDDs: A---B---CDEF-G
|  |
   |
  checkpoint  checkpoint
 checkpoint

Older intermediate RDDs never get used.


Re: coalesce with shuffle or repartition is not necessarily fault-tolerant

2014-10-09 Thread Sung Hwan Chung
Are there a large number of non-deterministic lineage operators?

This seems like a pretty big caveat, particularly for casual programmers
who expect consistent semantics between Spark and Scala.

E.g., making sure that there's no randomness what-so-ever in RDD
transformations seems critical. Additionally, shuffling operators would
usually result in changed orders, etc.

These are very easy errors to make, and if you tend to cache things, some
errors won't be detected until fault-tolerance is triggered. It would be
very helpful for programmers to have a big warning list of not-to-dos
within RDD transformations.

On Wed, Oct 8, 2014 at 11:57 PM, Sean Owen so...@cloudera.com wrote:

 Yes, I think this another operation that is not deterministic even for
 the same RDD. If a partition is lost and recalculated the ordering can
 be different in the partition. Sorting the RDD makes the ordering
 deterministic.

 On Thu, Oct 9, 2014 at 7:51 AM, Sung Hwan Chung
 coded...@cs.stanford.edu wrote:
  Let's say you have some rows in a dataset (say X partitions initially).
 
  A
  B
  C
  D
  E
  .
  .
  .
  .
 
 
  You repartition to Y  X, then it seems that any of the following could
 be
  valid:
 
  partition 1 partition 2
 
  A  B
  
  C  E
  D   .
  
  --
  C  E
  A  B
  D  .
  --
  D  B
  C  E
  A
 
  etc. etc.
 
  I.e., although each partition will have the same unordered set, the rows'
  orders will change from call to call.
 
  Now, because row ordering can change from call to call, if you do any
  operation that depends on the order of items you saw, then lineage is no
  longer deterministic. For example, it seems that the repartition call
 itself
  is a row-order dependent call, because it creates a random number
 generator
  with the partition index as the seed, and then call nextInt as you go
  through the rows.
 
 
  On Wed, Oct 8, 2014 at 10:14 PM, Patrick Wendell pwend...@gmail.com
 wrote:
 
  IIRC - the random is seeded with the index, so it will always produce
  the same result for the same index. Maybe I don't totally follow
  though. Could you give a small example of how this might change the
  RDD ordering in a way that you don't expect? In general repartition()
  will not preserve the ordering of an RDD.
 
  On Wed, Oct 8, 2014 at 3:42 PM, Sung Hwan Chung
  coded...@cs.stanford.edu wrote:
   I noticed that repartition will result in non-deterministic lineage
   because
   it'll result in changed orders for rows.
  
   So for instance, if you do things like:
  
   val data = read(...)
   val k = data.repartition(5)
   val h = k.repartition(5)
  
   It seems that this results in different ordering of rows for 'k' each
   time
   you call it.
   And because of this different ordering, 'h' will result in different
   partitions even, because 'repartition' distributes through a random
   number
   generator with the 'index' as the key.
 
 



Re: java.io.IOException Error in task deserialization

2014-10-09 Thread Sung Hwan Chung
I don't think that I saw any other error message. This is all I saw.

I'm currently experimenting to see if this can be alleviated by using
HttpBroadcastFactory instead of TorrentBroadcast. So far, with
HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you
posted.

On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote:

 This exception should be caused by another one, could you paste all of
 them here?

 Also, that will be great if you can provide a script to reproduce this
 problem.

 Thanks!

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:
  Has anyone else seen this erorr in task deserialization?  The task is
  processing a small amount of data and doesn't seem to have much data
 hanging
  to the closure?  I've only seen this with Spark 1.1
 
  Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times,
 most
  recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com):
  java.io.IOException: unexpected exception type
 
  java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744)

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Intermittent checkpointing failure.

2014-10-09 Thread Sung Hwan Chung
I'm getting DFS closed channel exception every now and then when I run
checkpoint. I do checkpointing every 15 minutes or so. This happens usually
after running the job for 1~2 hours. Anyone seen this before?

Job aborted due to stage failure: Task 6 in stage 70.0 failed 4 times,
most recent failure: Lost task 6.3 in stage 70.0 (TID 1264,
alpinenode7.alpinenow.local):
java.nio.channels.ClosedChannelException:

org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1526)
org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98)

org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
java.io.DataOutputStream.write(DataOutputStream.java:107)

java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)

java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)

java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1285)
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1230)

java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1426)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)

org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:114)

org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:95)

org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:95)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)


Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?

2014-10-08 Thread Sung Hwan Chung
There is no circular dependency. Its simply dropping references to prev RDDs 
because there is no need for it.

I wonder if that messes up things up though internally for Spark due to losing 
references to intermediate RDDs.

 On Oct 8, 2014, at 12:13 PM, Akshat Aranya aara...@gmail.com wrote:
 
 Using a var for RDDs in this way is not going to work.  In this example, 
 tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after 
 that, you change what tx2 means, so you would end up having a circular 
 dependency.
 
 On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu 
 wrote:
 My job is not being fault-tolerant (e.g., when there's a fetch failure or 
 something).
 
 The lineage of RDDs are constantly updated every iteration. However, I think 
 that when there's a failure, the lineage information is not being correctly 
 reapplied.
 
 It goes something like this:
 
 val rawRDD = read(...)
 val repartRDD = rawRDD.repartition(X)
 
 val tx1 = repartRDD.map(...)
 var tx2 = tx1.map(...)
 
 while (...) {
   tx2 = tx1.zip(tx2).map(...)
 }
 
 
 Is there any way to monitor RDD's lineage, maybe even including? I want to 
 make sure that there's no unexpected things happening.
 


Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?

2014-10-08 Thread Sung Hwan Chung
One thing I didn't mention is that we actually do data.repartition before
hand with shuffle.

I found that this can actually introduce randomness to lineage steps,
because data get shuffled to different partitions and lead to inconsistent
behavior if your algorithm is dependent on the order at which the data rows
appear, because now data rows will appear in a different orders.

If you want to guarantee fault-tolerance, you can't have any randomness
whatsoever in lineage steps, and repartition violates that (depending on
what you do with the data).

On Wed, Oct 8, 2014 at 12:24 PM, Sung Hwan Chung coded...@gmail.com wrote:

 There is no circular dependency. Its simply dropping references to prev
 RDDs because there is no need for it.

 I wonder if that messes up things up though internally for Spark due to
 losing references to intermediate RDDs.

 On Oct 8, 2014, at 12:13 PM, Akshat Aranya aara...@gmail.com wrote:

 Using a var for RDDs in this way is not going to work.  In this example,
 tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after
 that, you change what tx2 means, so you would end up having a circular
 dependency.

 On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu
  wrote:

 My job is not being fault-tolerant (e.g., when there's a fetch failure or
 something).

 The lineage of RDDs are constantly updated every iteration. However, I
 think that when there's a failure, the lineage information is not being
 correctly reapplied.

 It goes something like this:

 val rawRDD = read(...)
 val repartRDD = rawRDD.repartition(X)

 val tx1 = repartRDD.map(...)
 var tx2 = tx1.map(...)

 while (...) {
   tx2 = tx1.zip(tx2).map(...)
 }


 Is there any way to monitor RDD's lineage, maybe even including? I want
 to make sure that there's no unexpected things happening.





coalesce with shuffle or repartition is not necessarily fault-tolerant

2014-10-08 Thread Sung Hwan Chung
I noticed that repartition will result in non-deterministic lineage because
it'll result in changed orders for rows.

So for instance, if you do things like:

val data = read(...)
val k = data.repartition(5)
val h = k.repartition(5)

It seems that this results in different ordering of rows for 'k' each time
you call it.
And because of this different ordering, 'h' will result in different
partitions even, because 'repartition' distributes through a random number
generator with the 'index' as the key.


Re: java.io.IOException Error in task deserialization

2014-10-08 Thread Sung Hwan Chung
This is also happening to me on a regular basis, when the job is large with
relatively large serialized objects used in each RDD lineage. A bad thing
about this is that this exception always stops the whole job.


On Fri, Sep 26, 2014 at 11:17 AM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 FWIW I suspect that each count operation is an opportunity for you to
 trigger the bug, and each filter operation increases the likelihood of
 setting up the bug.  I normally don't come across this error until my job
 has been running for an hour or two and had a chance to build up longer
 lineages for some RDDs.  It sounds like your data is a bit smaller and it's
 more feasible for you to build up longer lineages more quickly.

 If you can reduce your number of filter operations (for example by
 combining some into a single function) that may help.  It may also help to
 introduce persistence or checkpointing at intermediate stages so that the
 length of the lineages that have to get replayed isn't as long.

 On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote:

 No for me as well it is non-deterministic.  It happens in a piece of code
 that does many filter and counts on a small set of records (~1k-10k).  The
 originally set is persisted in memory and we have a Kryo serializer set for
 it.  The task itself takes in just a few filtering parameters.  This with
 the same setting has sometimes completed to sucess and sometimes failed
 during this step.

 Arun

 On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 I've had multiple jobs crash due to java.io.IOException: unexpected
 exception type; I've been running the 1.1 branch for some time and am now
 running the 1.1 release binaries. Note that I only use PySpark. I haven't
 kept detailed notes or the tracebacks around since there are other problems
 that have caused my greater grief (namely key not found errors).

 For me the exception seems to occur non-deterministically, which is a
 bit interesting since the error message shows that the same stage has
 failed multiple times.  Are you able to consistently re-produce the bug
 across multiple invocations at the same place?

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Has anyone else seen this erorr in task deserialization?  The task is
 processing a small amount of data and doesn't seem to have much data
 hanging to the closure?  I've only seen this with Spark 1.1

 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, 
 most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): 
 java.io.IOException: unexpected exception type
 
 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:744)







Is RDD partition index consistent?

2014-10-06 Thread Sung Hwan Chung
Is the RDD partition index you get when you call mapPartitionWithIndex
consistent under fault-tolerance condition?

I.e.

1. Say index is 1 for one of the partitions when you call
data.mapPartitionWithIndex((index, rows) = ) // Say index is 1
2. The partition fails (maybe a long with a bunch of other partitions).
3. When the partitions get restarted somewhere else, will they retain the
same index value, as well as all the lineage arguments?


Spark fault tolerance after a executor failure.

2014-07-30 Thread Sung Hwan Chung
I sometimes see that after fully caching the data, if one of the executors
fails for some reason, that portion of cache gets lost and does not get
re-cached, even though there are plenty of resources. Is this a bug or a
normal behavior (V1.0.1)?


Re: Getting the number of slaves

2014-07-28 Thread Sung Hwan Chung
Do getExecutorStorageStatus and getExecutorMemoryStatus both return the
number of executors + the driver?
E.g., if I submit a job with 10 executors, I get 11 for
getExeuctorStorageStatus.length and getExecutorMemoryStatus.size


On Thu, Jul 24, 2014 at 4:53 PM, Nicolas Mai nicolas@gmail.com wrote:

 Thanks, this is what I needed :) I should have searched more...

 Something I noticed though: after the SparkContext is initialized, I had to
 wait for a few seconds until sc.getExecutorStorageStatus.length returns the
 correct number of workers in my cluster (otherwise it returns 1, for the
 driver)...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



collect on partitions get very slow near the last few partitions.

2014-06-28 Thread Sung Hwan Chung
I'm doing something like this:

rdd.groupBy.map().collect()

The work load on final map is pretty much evenly distributed.

When collect happens, say on 60 partitions, the first 55 or so partitions
finish very quickly say within 10 seconds. However, the last 5,
particularly the very last one, typically get very slow, the overall
collect time reaching 30 seconds to sometimes even 1 minute.

E.g., it would get stuck in a state like 54/55 for a much longer time.

Another interesting thing is the first iteration typically doesn't have
this problem, but it gets progressively worse despite having about the same
workload/partition sizes in subsequent iterations.

This problem worsens with smaller akka framesize and/or maxMbInFlight

Anyone know why this is so?


Re: collect on partitions get very slow near the last few partitions.

2014-06-28 Thread Sung Hwan Chung
I'm finding the following messages in the driver. Can this potentially have
anything to do with these drastic slowdowns?


14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for
shuffle 8 for deleting
14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 8
14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for
shuffle 7 for deleting
14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 7
14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for
shuffle 6 for deleting
14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 6




On Fri, Jun 27, 2014 at 11:35 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I'm doing something like this:

 rdd.groupBy.map().collect()

 The work load on final map is pretty much evenly distributed.

 When collect happens, say on 60 partitions, the first 55 or so partitions
 finish very quickly say within 10 seconds. However, the last 5,
 particularly the very last one, typically get very slow, the overall
 collect time reaching 30 seconds to sometimes even 1 minute.

 E.g., it would get stuck in a state like 54/55 for a much longer time.

 Another interesting thing is the first iteration typically doesn't have
 this problem, but it gets progressively worse despite having about the same
 workload/partition sizes in subsequent iterations.

 This problem worsens with smaller akka framesize and/or maxMbInFlight

 Anyone know why this is so?



Number of executors smaller than requested in YARN.

2014-06-25 Thread Sung Hwan Chung
Hi,

When I try requesting a large number of executors - e.g. 242, it doesn't
seem to actually reach that number. E.g., under the executors tab, I only
see an executor ID of upto 234.

This despite the fact that there're plenty more memory available as well as
CPU cores, etc in the system. In fact, in the YARN page, it shows that 243
containers are running (242 executors + driver).

Anyone know what's going on?


Does Spark restart cached workers even without failures?

2014-06-25 Thread Sung Hwan Chung
I'm doing coalesce with shuffle, cache and then do thousands of iterations.

I noticed that sometimes Spark would for no particular reason perform
partial coalesce again after running for a long time - and there was no
exception or failure on the worker's part.

Why is this happening?


Spark executor error

2014-06-25 Thread Sung Hwan Chung
I'm seeing the following message in the log of an executor. Anyone
seen this error? After this, the executor seems to lose the cache, and
but besides that the whole thing slows down drastically - I.e. it gets
stuck in a reduce phase for 40+ minutes, whereas before it was
finishing reduces in 2~3 seconds.



14/06/25 19:22:31 WARN SendingConnection: Error writing in connection
to ConnectionManagerId(alpinenode7.alpinenow.local,46251)
java.lang.NullPointerException
at 
org.apache.spark.network.MessageChunkHeader.buffer$lzycompute(MessageChunkHeader.scala:35)
at 
org.apache.spark.network.MessageChunkHeader.buffer(MessageChunkHeader.scala:32)
at 
org.apache.spark.network.MessageChunk.buffers$lzycompute(MessageChunk.scala:31)
at org.apache.spark.network.MessageChunk.buffers(MessageChunk.scala:29)
at 
org.apache.spark.network.SendingConnection.write(Connection.scala:349)
at 
org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:142)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)


Optimizing reduce for 'huge' aggregated outputs.

2014-06-09 Thread Sung Hwan Chung
Hello,

I noticed that the final reduce function happens in the driver node with a
code that looks like the following.

val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) {
 a.merge(b)
}

although individual outputs from mappers are small. Over time the
aggregated result outputMap could be huuuge (say with hundreds of millions
of keys and values, reaching giga bytes).

I noticed that, even if we have a lot of memory in the driver node, this
process becomes realy slow eventually (say we have 100+ partitions. the
first reduce is fast, but progressively, it becomes veeery slow as more and
more partition outputs get aggregated). Is this because the intermediate
reduce output gets serialized and then deserialized every time?

What I'd like ideally is, since reduce is taking place in the same machine
any way, there's no need for any serialization and deserialization, and
just aggregate the incoming results into the final aggregation. Is this
possible?


Re: Optimizing reduce for 'huge' aggregated outputs.

2014-06-09 Thread Sung Hwan Chung
I suppose what I want is the memory efficiency of toLocalIterator and the
speed of collect. Is there any such thing?


On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 Hello,

 I noticed that the final reduce function happens in the driver node with a
 code that looks like the following.

 val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) {
  a.merge(b)
 }

 although individual outputs from mappers are small. Over time the
 aggregated result outputMap could be huuuge (say with hundreds of millions
 of keys and values, reaching giga bytes).

 I noticed that, even if we have a lot of memory in the driver node, this
 process becomes realy slow eventually (say we have 100+ partitions. the
 first reduce is fast, but progressively, it becomes veeery slow as more and
 more partition outputs get aggregated). Is this because the intermediate
 reduce output gets serialized and then deserialized every time?

 What I'd like ideally is, since reduce is taking place in the same machine
 any way, there's no need for any serialization and deserialization, and
 just aggregate the incoming results into the final aggregation. Is this
 possible?



When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-06-05 Thread Sung Hwan Chung
I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume
that this means fully cached) to NODE_LOCAL or even RACK_LOCAL.

When these happen things get extremely slow.

Does this mean that the executor got terminated and restarted?

Is there a way to prevent this from happening (barring the machine actually
going down, I'd rather stick with the same process)?


Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-06-05 Thread Sung Hwan Chung
On a related note, I'd also minimize any kind of executor movement. I.e.,
once an executor is spawned and data cached in the executor, I want that
executor to live all the way till the job is finished, or the machine fails
in a fatal manner.

What would be the best way to ensure that this is the case?


On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume
 that this means fully cached) to NODE_LOCAL or even RACK_LOCAL.

 When these happen things get extremely slow.

 Does this mean that the executor got terminated and restarted?

 Is there a way to prevent this from happening (barring the machine
 actually going down, I'd rather stick with the same process)?



Spark assembly error.

2014-06-04 Thread Sung Hwan Chung
When I run sbt/sbt assembly, I get the following exception. Is anyone else
experiencing a similar problem?


..

[info] Resolving org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016
...

[info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}assembly...

[info] Resolving org.fusesource.jansi#jansi;1.4 ...

[info] Done updating.

[info] Resolving org.eclipse.jetty#jetty-server;8.1.14.v20131031 ...

[info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}examples...

[info] Resolving com.typesafe.genjavadoc#genjavadoc-plugin_2.10.4;0.5 ...

*[error] impossible to get artifacts when data has not been loaded. IvyNode
= org.slf4j#slf4j-api;1.6.1*

[info] Resolving org.fusesource.jansi#jansi;1.4 ...

[info] Done updating.

[warn]
/Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala:43:
constructor TaskAttemptID in class TaskAttemptID is deprecated: see
corresponding Javadoc for more information.

[warn] new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)

[warn] ^

[warn]
/Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:490:
constructor Job in class Job is deprecated: see corresponding Javadoc for
more information.

[warn] val job = new NewHadoopJob(hadoopConfiguration)

[warn]   ^

[warn]
/Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:623:
constructor Job in class Job is deprecated: see corresponding Javadoc for
more information.

[warn] val job = new NewHadoopJob(conf)


Re: Spark assembly error.

2014-06-04 Thread Sung Hwan Chung
Nevermind, it turns out that this is a problem for the Pivotal Hadoop that
we are trying to compile against.


On Wed, Jun 4, 2014 at 4:16 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 When I run sbt/sbt assembly, I get the following exception. Is anyone else
 experiencing a similar problem?


 ..

 [info] Resolving org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016
 ...

 [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}assembly...

 [info] Resolving org.fusesource.jansi#jansi;1.4 ...

 [info] Done updating.

 [info] Resolving org.eclipse.jetty#jetty-server;8.1.14.v20131031 ...

 [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}examples...

 [info] Resolving com.typesafe.genjavadoc#genjavadoc-plugin_2.10.4;0.5 ...

 *[error] impossible to get artifacts when data has not been loaded.
 IvyNode = org.slf4j#slf4j-api;1.6.1*

 [info] Resolving org.fusesource.jansi#jansi;1.4 ...

 [info] Done updating.

 [warn]
 /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala:43:
 constructor TaskAttemptID in class TaskAttemptID is deprecated: see
 corresponding Javadoc for more information.

 [warn] new TaskAttemptID(jtIdentifier, jobId, isMap, taskId,
 attemptId)

 [warn] ^

 [warn]
 /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:490:
 constructor Job in class Job is deprecated: see corresponding Javadoc for
 more information.

 [warn] val job = new NewHadoopJob(hadoopConfiguration)

 [warn]   ^

 [warn]
 /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:623:
 constructor Job in class Job is deprecated: see corresponding Javadoc for
 more information.

 [warn] val job = new NewHadoopJob(conf)



Re: is it okay to reuse objects across RDD's?

2014-04-28 Thread Sung Hwan Chung
Actually, I do not know how to do something like this or whether this is
possible - thus my suggestive statement.

Can you already declare persistent memory objects per worker? I tried
something like constructing a singleton object within map functions, but
that didn't work as it seemed to actually serialize singletons and pass it
back and forth in a weird manner.


On Mon, Apr 28, 2014 at 1:23 AM, Sean Owen so...@cloudera.com wrote:

 On Mon, Apr 28, 2014 at 8:22 AM, Sung Hwan Chung coded...@cs.stanford.edu
  wrote:

 e.g. something like

 rdd.mapPartition((rows : Iterator[String]) = {
   var idx = 0
   rows.map((row: String) = {
 val valueMap = SparkWorker.getMemoryContent(valMap)
 val prevVal = valueMap(idx)
 idx += 1
 ...
   })
   ...
 })

 The developer can implement their own fault recovery mechanism if the
 worker has crashed and lost the memory content.


 Yea you can always just declare your own per-partition data structures in
 a function block like that, right? valueMap can be initialized to an empty
 map, loaded from somewhere, or even a value that is broadcast from the
 driver.

 That's certainly better than tacking data onto RDDs.

 It's not restored if the computation is lost of course, but in this and
 many other cases, it's fine, as it is just for some cached intermediate
 results.

 This already works then or did I misunderstand the original use case?



Re: is it okay to reuse objects across RDD's?

2014-04-28 Thread Sung Hwan Chung
That might be a good alternative to what we are looking for. But I wonder
if this would be as efficient as we want to. For instance, will RDDs of the
same size usually get partitioned to the same machines - thus not
triggering any cross machine aligning, etc. We'll explore it, but I would
still very much like to see more direct worker memory management besides
RDDs.


On Mon, Apr 28, 2014 at 10:26 AM, Tom Vacek minnesota...@gmail.com wrote:

 Right---They are zipped at each iteration.


 On Mon, Apr 28, 2014 at 11:56 AM, Chester Chen chesterxgc...@yahoo.comwrote:

 Tom,
 Are you suggesting two RDDs, one with loss and another for the rest
 info, using zip to tie them together, but do update on loss RDD (copy) ?

 Chester

 Sent from my iPhone

 On Apr 28, 2014, at 9:45 AM, Tom Vacek minnesota...@gmail.com wrote:

 I'm not sure what I said came through.  RDD zip is not hacky at all, as
 it only depends on a user not changing the partitioning.  Basically, you
 would keep your losses as an RDD[Double] and zip whose with the RDD of
 examples, and update the losses.  You're doing a copy (and GC) on the RDD
 of losses each time, but this is negligible.


 On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Yes, this is what we've done as of now (if you read earlier threads).
 And we were saying that we'd prefer if Spark supported persistent worker
 memory management in a little bit less hacky way ;)


 On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote:

 A mutable map in an object should do what your looking for then I
 believe. You just reference the object as an object in your closure so it
 won't be swept up when your closure is serialized and you can reference
 variables of the object on the remote host then. e.g.:

 object MyObject {
   val mmap = scala.collection.mutable.Map[Long, Long]()
 }

 rdd.map { ele =
 MyObject.mmap.getOrElseUpdate(ele, 1L)
 ...
 }.map {ele =
 require(MyObject.mmap(ele) == 1L)

 }.count

 Along with the data loss just be careful with thread safety and
 multiple threads/partitions on one host so the map should be viewed as
 shared amongst a larger space.



 Also with your exact description it sounds like your data should be
 encoded into the RDD if its per-record/per-row:  RDD[(MyBaseData,
 LastIterationSideValues)]



 On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 In our case, we'd like to keep memory content from one iteration to
 the next, and not just during a single mapPartition call because then we
 can do more efficient computations using the values from the previous
 iteration.

 So essentially, we need to declare objects outside the scope of the
 map/reduce calls (but residing in individual workers), then those can be
 accessed from the map/reduce calls.

 We'd be making some assumptions as you said, such as - RDD partition
 is statically located and can't move from worker to another worker unless
 the worker crashes.



 On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote:

 On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Actually, I do not know how to do something like this or whether
 this is possible - thus my suggestive statement.

 Can you already declare persistent memory objects per worker? I
 tried something like constructing a singleton object within map 
 functions,
 but that didn't work as it seemed to actually serialize singletons and 
 pass
 it back and forth in a weird manner.


 Does it need to be persistent across operations, or just persist for
 the lifetime of processing of one partition in one mapPartition? The 
 latter
 is quite easy and might give most of the speedup.

 Maybe that's 'enough', even if it means you re-cache values several
 times in a repeated iterative computation. It would certainly avoid
 managing a lot of complexity in trying to keep that state alive remotely
 across operations. I'd also be interested if there is any reliable way to
 do that, though it seems hard since it means you embed assumptions about
 where particular data is going to be processed.









Re: Do developers have to be aware of Spark's fault tolerance mechanism?

2014-04-21 Thread Sung Hwan Chung
I would probably agree that it's typically not a good idea to add states to
distributed systems. Additionally, from a purist's perspective, this would
be a bit of hacking to the paradigm.

However, from a practical point of view, I think that it's a reasonable
trade-off between efficiency and complexity. It's not too difficult to have
a small set of mutable states being kept in-between iterations. And I think
a laarge number of iterative algorithms could benefit from this.

For the time being, we're thinking something like this:

RDD[Array[Double]] appended with an extra column that initializes to some
default value.

If the extra column in an iteration has the default value, it means either
something failed or it's the very first iteration, so we compute things
inefficiently. Otherwise, it has intermediate computational value, so we
can do efficient computation.


On Mon, Apr 21, 2014 at 11:15 AM, Marcelo Vanzin van...@cloudera.comwrote:

 Hi Sung,

 On Mon, Apr 21, 2014 at 10:52 AM, Sung Hwan Chung
 coded...@cs.stanford.edu wrote:
  The goal is to keep an intermediate value per row in memory, which would
  allow faster subsequent computations. I.e., computeSomething would
 depend on
  the previous value from the previous computation.

 I think the fundamental problem here is that there is no in memory
 state of the sort you mention when you're talking about
 map/reduce-style workloads. There are three kinds of data that you can
 use to communicate between sub-tasks:

 - RDD input / output, i.e. the arguments and return values of the
 closures you pass to transformations
 - Broadcast variables
 - Accumulators

 In general, distributed algorithms should strive to be stateless,
 exactly because of issues like reliability and having to re-run
 computations (and communication/coordination in general being
 expensive). The last two in the list above are not generally targeted
 at the kind of state-keeping that you seem to be talking about.

 So if you make the result of computeSomething() the output of your
 map task, then you'll have access to it in the operations downstream
 from that map task. But you can't store it in a variable in memory
 and access it later, because that's not how the system works.

 In any case, I'm really not familiar with ML algorithms, but maybe you
 should take a look at MLLib.


 --
 Marcelo



Re: Random Forest on Spark

2014-04-18 Thread Sung Hwan Chung
Debasish,

Unfortunately, we are bound to YARN, at least for the time being, because
that's what most of our customers would be using (unless, all the Hadoop
vendors start supporting standalone Spark - I think Cloudera might do
that?).




On Fri, Apr 18, 2014 at 11:12 AM, Debasish Das debasish.da...@gmail.comwrote:

 Spark on YARN is a big pain due to the strict memory requirement per
 container...

 If you are stress testing it, could you use a standalone cluster and see
 at which feature upper bound does per worker RAM requirement reaches 16 GB
 or more...it is possible to get 16 GB instances on EC2 these days without
 much trouble.,..

 Another way is to run a feature selection algorithm to decrease features
 space before running decision tree or algorithm variants...There is a PR on
 entropy based feature selection algorithms...you don't want to use them to
 decrease features right ?

 A feature extraction algorithm like matrix factorization and it's variants
 could be used to decrease feature space as well...



 On Fri, Apr 18, 2014 at 10:53 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Thanks for the info on mem requirement.

 I think that a lot of businesses would probably prefer to use Spark on
 top of YARN, since that's what they invest on - a large Hadoop cluster. And
 the default setting for YARN seems to cap memory per container to 8 GB - so
 ideally, we would like to use a lot less than that (rather than telling
 them, nooo change your YARN settings).

 A convenient feature would be to automatically figure things out, and try
 to adapt the algorithm to memory limits (e.g., process X # of nodes at a
 time, instead of all the nodes at the same level). Additionally, we noticed
 that the default 'Double' usage for LabelPoint is very wasteful for a
 majority of data sets. Float would do most of times and in fact, a lot of
 datasets could get away with using Short or even Byte. Or in your case,
 since you're transforming data to Bins anyways, you could probably cache
 BIN IDs (for which you could use Short or Byte even)?



 On Fri, Apr 18, 2014 at 8:43 AM, Evan R. Sparks evan.spa...@gmail.comwrote:

 Interesting, and thanks for the thoughts.

 I think we're on the same page with 100s of millions of records. We've
 tested the tree implementation in mllib on 1b rows and up to 100 features -
 though this isn't hitting the 1000s of features you mention.

 Obviously multi class support isn't there yet, but I can see your point
 about deeper trees for many class problems. Will try them out on some image
 processing stuff with 1k classes we're doing in the lab once they are more
 developed to get a sense for where the issues are.

 If you're only allocating 2GB/worker you're going to have a hard time
 getting the real advantages of Spark.

 For your 1k features causing heap exceptions at depth 5  - are these
 categorical or continuous? The categorical vars create much smaller
 histograms.

 If you're fitting all continuous features, the memory requirements are
 O(b*d*2^l) where b=number of histogram bins, d=number of features, and l =
 level of the tree. Even accounting for object overhead, with the default
 number of bins, the histograms at this depth should be order of 10s of MB,
 not 2GB - so I'm guessing your cached data is occupying a significant chunk
 of that 2GB? In the tree PR - Hirakendu Das tested down to depth 10 on 500m
 data points with 20 continuous features and was able to run without running
 into memory issues (and scaling properties got better as the depth grew).
 His worker mem was 7.5GB and 30% of that was reserved for caching. If you
 wanted to go 1000 features at depth 10 I'd estimate a couple of gigs
 necessary for heap space for the worker to compute/store the histograms,
 and I guess 2x that on the master to do the reduce.

 Again 2GB per worker is pretty tight, because there are overheads of
 just starting the jvm, launching a worker, loading libraries, etc.

 - Evan

 On Apr 17, 2014, at 6:10 PM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:

 Yes, it should be data specific and perhaps we're biased toward the data
 sets that we are playing with. To put things in perspective, we're highly
 interested in (and I believe, our customers are):

 1. large (hundreds of millions of rows)
 2. multi-class classification - nowadays, dozens of target categories
 are common and even thousands in some cases - you could imagine that this
 is a big reason for us requiring more 'complex' models
 3. high dimensional with thousands of descriptive and
 sort-of-independent features

 From the theoretical perspective, I would argue that it's usually in the
 best interest to prune as little as possible. I believe that pruning
 inherently increases bias of an individual tree, which RF can't do anything
 about while decreasing variance - which is what RF is for.

 The default pruning criteria for R's reference implementation is
 min-node of 1 (meaning fully-grown tree

Re: Random Forest on Spark

2014-04-18 Thread Sung Hwan Chung
I would argue that memory in clusters is still a limited resource and it's
still beneficial to use memory as economically as possible. Let's say that
you are training a gradient boosted model in Spark, which could conceivably
take several hours to build hundreds to thousands of trees. You do not want
to be occupying a significant portion of the cluster memory such that
nobody else can run anything of significance.

We have a dataset that's only ~10GB CSV in the file system, now once we
cached the whole thing in Spark, it ballooned to 64 GB or so in memory and
so we had to use a lot more workers with memory just so that we could cache
the whole thing - this was due to the fact that although all the features
were byte-sized, MLLib defaults to Double.


On Fri, Apr 18, 2014 at 1:39 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 I don't think the YARN default of max 8GB container size is a good
 justification for limiting memory per worker.  This is a sort of arbitrary
 number that came from an era where MapReduce was the main YARN application
 and machines generally had less memory.  I expect to see this to get
 configured as much higher in practice on most clusters running Spark.

 YARN integration is actually complete in CDH5.0.  We support it as well as
 standalone mode.




 On Fri, Apr 18, 2014 at 11:49 AM, Sean Owen so...@cloudera.com wrote:

 On Fri, Apr 18, 2014 at 7:31 PM, Sung Hwan Chung
 coded...@cs.stanford.edu wrote:
  Debasish,
 
  Unfortunately, we are bound to YARN, at least for the time being,
 because
  that's what most of our customers would be using (unless, all the Hadoop
  vendors start supporting standalone Spark - I think Cloudera might do
  that?).

 Yes the CDH5.0.0 distro just runs Spark in stand-alone mode. Using the
 YARN integration is still being worked on.





Re: Random Forest on Spark

2014-04-18 Thread Sung Hwan Chung
Sorry, that was incomplete information, I think Spark's compression helped
(not sure how much though) that the actual memory requirement may have been
smaller.


On Fri, Apr 18, 2014 at 3:16 PM, Sung Hwan Chung
coded...@cs.stanford.eduwrote:

 I would argue that memory in clusters is still a limited resource and it's
 still beneficial to use memory as economically as possible. Let's say that
 you are training a gradient boosted model in Spark, which could conceivably
 take several hours to build hundreds to thousands of trees. You do not want
 to be occupying a significant portion of the cluster memory such that
 nobody else can run anything of significance.

 We have a dataset that's only ~10GB CSV in the file system, now once we
 cached the whole thing in Spark, it ballooned to 64 GB or so in memory and
 so we had to use a lot more workers with memory just so that we could cache
 the whole thing - this was due to the fact that although all the features
 were byte-sized, MLLib defaults to Double.


 On Fri, Apr 18, 2014 at 1:39 PM, Sandy Ryza sandy.r...@cloudera.comwrote:

 I don't think the YARN default of max 8GB container size is a good
 justification for limiting memory per worker.  This is a sort of arbitrary
 number that came from an era where MapReduce was the main YARN application
 and machines generally had less memory.  I expect to see this to get
 configured as much higher in practice on most clusters running Spark.

 YARN integration is actually complete in CDH5.0.  We support it as well
 as standalone mode.




 On Fri, Apr 18, 2014 at 11:49 AM, Sean Owen so...@cloudera.com wrote:

 On Fri, Apr 18, 2014 at 7:31 PM, Sung Hwan Chung
 coded...@cs.stanford.edu wrote:
  Debasish,
 
  Unfortunately, we are bound to YARN, at least for the time being,
 because
  that's what most of our customers would be using (unless, all the
 Hadoop
  vendors start supporting standalone Spark - I think Cloudera might do
  that?).

 Yes the CDH5.0.0 distro just runs Spark in stand-alone mode. Using the
 YARN integration is still being worked on.






Do developers have to be aware of Spark's fault tolerance mechanism?

2014-04-18 Thread Sung Hwan Chung
Are there scenarios where the developers have to be aware of how Spark's
fault tolerance works to implement correct programs?

It seems that if we want to maintain any sort of mutable state in each
worker through iterations, it can have some unintended effect once a
machine goes down.

E.g.,

while (true) {
  rdd.map((row : Array[Double]) = {
row[numCols - 1] = computeSomething(row)
  }).reduce(...)
}

If it fails at some point, I'd imagine that the intermediate info being
stored in row[numCols - 1] will be lost. And unless Spark runs this whole
thing from the very first iteration, things will get out of sync.

I'd imagine that as long as we don't use mutable tricks inside of worker
tasks, we should be OK, but once we start doing that, things could get
ugly, unless we account for how Spark handles fault tolerance?


Re: Random Forest on Spark

2014-04-17 Thread Sung Hwan Chung
Debasish, we've tested the MLLib decision tree a bit and it eats up too
much memory for RF purposes.
Once the tree got to depth 8~9, it was easy to get heap exception, even
with 2~4 GB of memory per worker.

With RF, it's very easy to get 100+ depth in RF with even only 100,000+
rows (because trees usually are not balanced). Additionally, the lack of
multi-class classification limits its applicability.

Also, RF requires random features per tree node to be effective (not just
bootstrap samples), and MLLib decision tree doesn't support that.


On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das debasish.da...@gmail.comwrote:

 Mllib has decision treethere is a rf pr which is not active
 nowtake that and swap the tree builder with the fast tree builder
 that's in mllib...search for the spark jira...the code is based on google
 planet paper. ..

 I am sure people in devlist are already working on it...send an email to
 know the status over there...

 There is also a rf in cloudera oryx but we could not run it on our data
 yet

 Weka 3.7.10 has a multi thread rf that is good to do some adhoc runs but
 it does not scale...
  On Apr 17, 2014 2:45 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 For one of my application, I want to use Random forests(RF) on top of
 spark. I see that currenlty MLLib does not have implementation for RF. What
 other opensource RF implementations will be great to use with spark in
 terms of speed?

 Regards,
 Laeeq Ahmed,
 KTH, Sweden.




Re: Random Forest on Spark

2014-04-17 Thread Sung Hwan Chung
Evan,

I actually haven't heard of 'shallow' random forest. I think that the only
scenarios where shallow trees are useful are boosting scenarios.

AFAIK, Random Forest is a variance reducing technique and doesn't do much
about bias (although some people claim that it does have some bias reducing
effect). Because shallow trees typically have higher bias than fully-grown
trees, people don't often use shallow trees with RF.

You can confirm this through some experiments with R's random forest
implementation as well. They allow you to set some limits of depth and/or
pruning.

In contrast, boosting is a bias reduction technique (and increases
variance), so people typically use shallow trees.

Our empirical experiments also confirmed that shallow trees resulted in
drastically lower accuracy for random forests.

There are some papers that mix boosting-like technique with bootstrap
averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could
potentially use shallow trees to build boosted learners, but then average
the results of many boosted learners.


On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks evan.spa...@gmail.comwrote:

 Multiclass classification, Gradient Boosting, and Random Forest support
 for based on the recent Decision Tree implementation in MLlib.

 Sung - I'd be curious to hear about your use of decision trees (and
 forests) where you want to go to 100+ depth. My experience with random
 forests has been that people typically build hundreds of shallow trees
 (maybe depth 7 or 8), rather than a few (or many) really deep trees.

 Generally speaking, we save passes over the data by computing histograms
 per variable per split at each *level* of a decision tree. This can blow up
 as the level of the decision tree gets deep, but I'd recommend a lot more
 memory than 2-4GB per worker for most big data workloads.





 On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Debasish, we've tested the MLLib decision tree a bit and it eats up too
 much memory for RF purposes.
 Once the tree got to depth 8~9, it was easy to get heap exception, even
 with 2~4 GB of memory per worker.

 With RF, it's very easy to get 100+ depth in RF with even only 100,000+
 rows (because trees usually are not balanced). Additionally, the lack of
 multi-class classification limits its applicability.

 Also, RF requires random features per tree node to be effective (not just
 bootstrap samples), and MLLib decision tree doesn't support that.


 On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das 
 debasish.da...@gmail.comwrote:

 Mllib has decision treethere is a rf pr which is not active
 nowtake that and swap the tree builder with the fast tree builder
 that's in mllib...search for the spark jira...the code is based on google
 planet paper. ..

 I am sure people in devlist are already working on it...send an email to
 know the status over there...

 There is also a rf in cloudera oryx but we could not run it on our data
 yet

 Weka 3.7.10 has a multi thread rf that is good to do some adhoc runs but
 it does not scale...
  On Apr 17, 2014 2:45 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 For one of my application, I want to use Random forests(RF) on top of
 spark. I see that currenlty MLLib does not have implementation for RF. What
 other opensource RF implementations will be great to use with spark in
 terms of speed?

 Regards,
 Laeeq Ahmed,
 KTH, Sweden.






Re: Random Forest on Spark

2014-04-17 Thread Sung Hwan Chung
Well, if you read the original paper,
http://oz.berkeley.edu/~breiman/randomforest2001.pdf
Grow the tree using CART methodology to maximum size and do not prune.

Now, the elements of statistical learning book on page 598 says that you
could potentially overfit fully-grown regression random forest. However,
this effect is very slight, and likely negligible for classifications.
http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf

In our experiments however, if the pruning is drastic, then the
performance actually becomes much worse. This makes intuitive sense IMO
because a decision tree is a non-parametric model, and the expressibility
of a tree depends on the number of nodes.

With a huge amount of data (millions or even billions of rows), we found
that the depth of 10 is simply not adequate to build high-accuracy models.


On Thu, Apr 17, 2014 at 12:30 PM, Evan R. Sparks evan.spa...@gmail.comwrote:

 Hmm... can you provide some pointers to examples where deep trees are
 helpful?

 Typically with Decision Trees you limit depth (either directly or
 indirectly with minimum node size and minimum improvement criteria) to
 avoid overfitting. I agree with the assessment that forests are a variance
 reduction technique, but I'd be a little surprised if a bunch of hugely
 deep trees don't overfit to training data. I guess I view limiting tree
 depth as an analogue to regularization in linear models.


 On Thu, Apr 17, 2014 at 12:19 PM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Evan,

 I actually haven't heard of 'shallow' random forest. I think that the
 only scenarios where shallow trees are useful are boosting scenarios.

 AFAIK, Random Forest is a variance reducing technique and doesn't do much
 about bias (although some people claim that it does have some bias reducing
 effect). Because shallow trees typically have higher bias than fully-grown
 trees, people don't often use shallow trees with RF.

 You can confirm this through some experiments with R's random forest
 implementation as well. They allow you to set some limits of depth and/or
 pruning.

 In contrast, boosting is a bias reduction technique (and increases
 variance), so people typically use shallow trees.

 Our empirical experiments also confirmed that shallow trees resulted in
 drastically lower accuracy for random forests.

 There are some papers that mix boosting-like technique with bootstrap
 averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could
 potentially use shallow trees to build boosted learners, but then average
 the results of many boosted learners.


 On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks 
 evan.spa...@gmail.comwrote:

 Multiclass classification, Gradient Boosting, and Random Forest support
 for based on the recent Decision Tree implementation in MLlib.

 Sung - I'd be curious to hear about your use of decision trees (and
 forests) where you want to go to 100+ depth. My experience with random
 forests has been that people typically build hundreds of shallow trees
 (maybe depth 7 or 8), rather than a few (or many) really deep trees.

 Generally speaking, we save passes over the data by computing histograms
 per variable per split at each *level* of a decision tree. This can blow up
 as the level of the decision tree gets deep, but I'd recommend a lot more
 memory than 2-4GB per worker for most big data workloads.





 On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Debasish, we've tested the MLLib decision tree a bit and it eats up too
 much memory for RF purposes.
 Once the tree got to depth 8~9, it was easy to get heap exception, even
 with 2~4 GB of memory per worker.

 With RF, it's very easy to get 100+ depth in RF with even only 100,000+
 rows (because trees usually are not balanced). Additionally, the lack of
 multi-class classification limits its applicability.

 Also, RF requires random features per tree node to be effective (not
 just bootstrap samples), and MLLib decision tree doesn't support that.


 On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das 
 debasish.da...@gmail.com wrote:

 Mllib has decision treethere is a rf pr which is not active
 nowtake that and swap the tree builder with the fast tree builder
 that's in mllib...search for the spark jira...the code is based on google
 planet paper. ..

 I am sure people in devlist are already working on it...send an email
 to know the status over there...

 There is also a rf in cloudera oryx but we could not run it on our
 data yet

 Weka 3.7.10 has a multi thread rf that is good to do some adhoc runs
 but it does not scale...
  On Apr 17, 2014 2:45 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 For one of my application, I want to use Random forests(RF) on top of
 spark. I see that currenlty MLLib does not have implementation for RF. 
 What
 other opensource RF implementations will be great to use with spark in
 terms of speed?

 Regards,
 Laeeq Ahmed,
 KTH, Sweden.








Re: Random Forest on Spark

2014-04-17 Thread Sung Hwan Chung
Additionally, the 'random features per node' (or mtry in R) is a very
important feature for Random Forest. The variance reduction comes if the
trees are decorrelated from each other and often the random features per
node does more than bootstrap samples. And this is something that would
have to be supported at the tree level.


On Thu, Apr 17, 2014 at 1:43 PM, Sung Hwan Chung
coded...@cs.stanford.eduwrote:

 Well, if you read the original paper,
 http://oz.berkeley.edu/~breiman/randomforest2001.pdf
 Grow the tree using CART methodology to maximum size and do not prune.

 Now, the elements of statistical learning book on page 598 says that you
 could potentially overfit fully-grown regression random forest. However,
 this effect is very slight, and likely negligible for classifications.
 http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf

 In our experiments however, if the pruning is drastic, then the
 performance actually becomes much worse. This makes intuitive sense IMO
 because a decision tree is a non-parametric model, and the expressibility
 of a tree depends on the number of nodes.

 With a huge amount of data (millions or even billions of rows), we found
 that the depth of 10 is simply not adequate to build high-accuracy models.


 On Thu, Apr 17, 2014 at 12:30 PM, Evan R. Sparks evan.spa...@gmail.comwrote:

 Hmm... can you provide some pointers to examples where deep trees are
 helpful?

 Typically with Decision Trees you limit depth (either directly or
 indirectly with minimum node size and minimum improvement criteria) to
 avoid overfitting. I agree with the assessment that forests are a variance
 reduction technique, but I'd be a little surprised if a bunch of hugely
 deep trees don't overfit to training data. I guess I view limiting tree
 depth as an analogue to regularization in linear models.


 On Thu, Apr 17, 2014 at 12:19 PM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Evan,

 I actually haven't heard of 'shallow' random forest. I think that the
 only scenarios where shallow trees are useful are boosting scenarios.

 AFAIK, Random Forest is a variance reducing technique and doesn't do
 much about bias (although some people claim that it does have some bias
 reducing effect). Because shallow trees typically have higher bias than
 fully-grown trees, people don't often use shallow trees with RF.

 You can confirm this through some experiments with R's random forest
 implementation as well. They allow you to set some limits of depth and/or
 pruning.

 In contrast, boosting is a bias reduction technique (and increases
 variance), so people typically use shallow trees.

 Our empirical experiments also confirmed that shallow trees resulted in
 drastically lower accuracy for random forests.

 There are some papers that mix boosting-like technique with bootstrap
 averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could
 potentially use shallow trees to build boosted learners, but then average
 the results of many boosted learners.


 On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks 
 evan.spa...@gmail.comwrote:

 Multiclass classification, Gradient Boosting, and Random Forest support
 for based on the recent Decision Tree implementation in MLlib.

 Sung - I'd be curious to hear about your use of decision trees (and
 forests) where you want to go to 100+ depth. My experience with random
 forests has been that people typically build hundreds of shallow trees
 (maybe depth 7 or 8), rather than a few (or many) really deep trees.

 Generally speaking, we save passes over the data by computing
 histograms per variable per split at each *level* of a decision tree. This
 can blow up as the level of the decision tree gets deep, but I'd recommend
 a lot more memory than 2-4GB per worker for most big data workloads.





 On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Debasish, we've tested the MLLib decision tree a bit and it eats up
 too much memory for RF purposes.
 Once the tree got to depth 8~9, it was easy to get heap exception,
 even with 2~4 GB of memory per worker.

 With RF, it's very easy to get 100+ depth in RF with even only
 100,000+ rows (because trees usually are not balanced). Additionally, the
 lack of multi-class classification limits its applicability.

 Also, RF requires random features per tree node to be effective (not
 just bootstrap samples), and MLLib decision tree doesn't support that.


 On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das 
 debasish.da...@gmail.com wrote:

 Mllib has decision treethere is a rf pr which is not active
 nowtake that and swap the tree builder with the fast tree builder
 that's in mllib...search for the spark jira...the code is based on google
 planet paper. ..

 I am sure people in devlist are already working on it...send an email
 to know the status over there...

 There is also a rf in cloudera oryx but we could not run it on our
 data yet

 Weka 3.7.10 has

Re: Random Forest on Spark

2014-04-17 Thread Sung Hwan Chung
I believe that they show one example comparing depth 1 ensemble vs depth 3
ensemble but it is based on boosting, not bagging.


On Thu, Apr 17, 2014 at 2:21 PM, Debasish Das debasish.da...@gmail.comwrote:

 Evan,

 Was not mllib decision tree implemented using ideas from Google's PLANET
 paper...do the paper also propose to grow a shallow tree ?

 Thanks.
 Deb


 On Thu, Apr 17, 2014 at 1:52 PM, Sung Hwan Chung coded...@cs.stanford.edu
  wrote:

 Additionally, the 'random features per node' (or mtry in R) is a very
 important feature for Random Forest. The variance reduction comes if the
 trees are decorrelated from each other and often the random features per
 node does more than bootstrap samples. And this is something that would
 have to be supported at the tree level.


 On Thu, Apr 17, 2014 at 1:43 PM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Well, if you read the original paper,
 http://oz.berkeley.edu/~breiman/randomforest2001.pdf
 Grow the tree using CART methodology to maximum size and do not prune.

 Now, the elements of statistical learning book on page 598 says that you
 could potentially overfit fully-grown regression random forest. However,
 this effect is very slight, and likely negligible for classifications.
 http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf

 In our experiments however, if the pruning is drastic, then the
 performance actually becomes much worse. This makes intuitive sense IMO
 because a decision tree is a non-parametric model, and the expressibility
 of a tree depends on the number of nodes.

 With a huge amount of data (millions or even billions of rows), we found
 that the depth of 10 is simply not adequate to build high-accuracy models.


 On Thu, Apr 17, 2014 at 12:30 PM, Evan R. Sparks 
 evan.spa...@gmail.comwrote:

 Hmm... can you provide some pointers to examples where deep trees are
 helpful?

 Typically with Decision Trees you limit depth (either directly or
 indirectly with minimum node size and minimum improvement criteria) to
 avoid overfitting. I agree with the assessment that forests are a variance
 reduction technique, but I'd be a little surprised if a bunch of hugely
 deep trees don't overfit to training data. I guess I view limiting tree
 depth as an analogue to regularization in linear models.


 On Thu, Apr 17, 2014 at 12:19 PM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Evan,

 I actually haven't heard of 'shallow' random forest. I think that the
 only scenarios where shallow trees are useful are boosting scenarios.

 AFAIK, Random Forest is a variance reducing technique and doesn't do
 much about bias (although some people claim that it does have some bias
 reducing effect). Because shallow trees typically have higher bias than
 fully-grown trees, people don't often use shallow trees with RF.

 You can confirm this through some experiments with R's random forest
 implementation as well. They allow you to set some limits of depth and/or
 pruning.

 In contrast, boosting is a bias reduction technique (and increases
 variance), so people typically use shallow trees.

 Our empirical experiments also confirmed that shallow trees resulted
 in drastically lower accuracy for random forests.

 There are some papers that mix boosting-like technique with bootstrap
 averaging (e.g. http://arxiv.org/pdf/1103.2068.pdf) where you could
 potentially use shallow trees to build boosted learners, but then average
 the results of many boosted learners.


 On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks 
 evan.spa...@gmail.com wrote:

 Multiclass classification, Gradient Boosting, and Random Forest
 support for based on the recent Decision Tree implementation in MLlib.

 Sung - I'd be curious to hear about your use of decision trees (and
 forests) where you want to go to 100+ depth. My experience with random
 forests has been that people typically build hundreds of shallow trees
 (maybe depth 7 or 8), rather than a few (or many) really deep trees.

 Generally speaking, we save passes over the data by computing
 histograms per variable per split at each *level* of a decision tree. 
 This
 can blow up as the level of the decision tree gets deep, but I'd 
 recommend
 a lot more memory than 2-4GB per worker for most big data workloads.





 On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Debasish, we've tested the MLLib decision tree a bit and it eats up
 too much memory for RF purposes.
 Once the tree got to depth 8~9, it was easy to get heap exception,
 even with 2~4 GB of memory per worker.

 With RF, it's very easy to get 100+ depth in RF with even only
 100,000+ rows (because trees usually are not balanced). Additionally, 
 the
 lack of multi-class classification limits its applicability.

 Also, RF requires random features per tree node to be effective (not
 just bootstrap samples), and MLLib decision tree doesn't support that.


 On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das

Re: Random Forest on Spark

2014-04-17 Thread Sung Hwan Chung
Yes, it should be data specific and perhaps we're biased toward the data
sets that we are playing with. To put things in perspective, we're highly
interested in (and I believe, our customers are):

1. large (hundreds of millions of rows)
2. multi-class classification - nowadays, dozens of target categories are
common and even thousands in some cases - you could imagine that this is a
big reason for us requiring more 'complex' models
3. high dimensional with thousands of descriptive and sort-of-independent
features

From the theoretical perspective, I would argue that it's usually in the
best interest to prune as little as possible. I believe that pruning
inherently increases bias of an individual tree, which RF can't do anything
about while decreasing variance - which is what RF is for.

The default pruning criteria for R's reference implementation is min-node
of 1 (meaning fully-grown tree) for classification, and 5 for regression.
I'd imagine they did at least some empirical testing to justify these
values at the time - although at a time of small datasets :).

FYI, we are also considering the MLLib decision tree for our Gradient
Boosting implementation, however, the memory requirement is still a bit too
steep (we were getting heap exceptions at depth limit of 5 with 2GB per
worker with approximately 1000 features). Now 2GB per worker is about what
we expect our typical customers would tolerate and I don't think that it's
unreasonable for shallow trees.



On Thu, Apr 17, 2014 at 3:54 PM, Evan R. Sparks evan.spa...@gmail.comwrote:

 What kind of data are you training on? These effects are *highly* data
 dependent, and while saying the depth of 10 is simply not adequate to
 build high-accuracy models may be accurate for the particular problem
 you're modeling, it is not true in general. From a statistical perspective,
 I consider each node in each tree an additional degree of freedom for the
 model, and all else equal I'd expect a model with fewer degrees of freedom
 to generalize better. Regardless, if there are lots of use cases for really
 deep trees, we'd like to hear about them so that we can decide how
 important they are to support!

 In the context of CART - pruning very specifically refers to a step
 *after* a tree has been constructed to some depth using cross-validation.
 This was a variance reduction technique in the original tree work that is
 unnecessary and computationally expensive in the context of forests. In the
 original Random Forests paper, there are still stopping criteria - usually
 either minimum leaf size or minimum split improvement (or both), so
 training to maximum depth doesn't mean train until you've completely
 divided your dataset and there's one point per leaf. My point is that if
 you set minimum leaf size to something like 0.2% of the dataset, then
 you're not going to get deeper than 10 or 12 levels with a reasonably
 balanced tree.

 With respect to PLANET - our implementation is very much in the spirit of
 planet, but has some key differences - there's good documentation on
 exactly what the differences are forthcoming, so I won't belabor these
 here. The differences are designed to 1) avoid data shuffling, and 2)
 minimize number of passes over the training data. Of course, there are
 tradeoffs involved, and there is at least one really good trick in the
 PLANET work that we should leverage that we aren't yet - namely once the
 nodes get small enough for data to fit easily on a single machine, data can
 be shuffled and then the remainder of the tree can be trained in parallel
 from each lower node on a single machine This would actually help with the
 memory overheads in model training when trees get deep  - if someone wants
 to modify the current implementation of trees in MLlib and contribute this
 optimization as a pull request, it would be welcome!

 At any rate, we'll take this feedback into account with respect to
 improving the tree implementation, but if anyone can send over use cases or
 (even better) datasets where really deep trees are necessary, that would be
 great!




 On Thu, Apr 17, 2014 at 1:43 PM, Sung Hwan Chung coded...@cs.stanford.edu
  wrote:

 Well, if you read the original paper,
 http://oz.berkeley.edu/~breiman/randomforest2001.pdf
 Grow the tree using CART methodology to maximum size and do not prune.

 Now, the elements of statistical learning book on page 598 says that you
 could potentially overfit fully-grown regression random forest. However,
 this effect is very slight, and likely negligible for classifications.
 http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf

 In our experiments however, if the pruning is drastic, then the
 performance actually becomes much worse. This makes intuitive sense IMO
 because a decision tree is a non-parametric model, and the expressibility
 of a tree depends on the number of nodes.

 With a huge amount of data (millions or even billions of rows), we found
 that the depth of 10

Mutable tagging RDD rows ?

2014-03-28 Thread Sung Hwan Chung
Hey guys,

I need to tag individual RDD lines with some values. This tag value would
change at every iteration. Is this possible with RDD (I suppose this is
sort of like mutable RDD, but it's more) ?

If not, what would be the best way to do something like this? Basically, we
need to keep mutable information per data row (this would be something much
smaller than actual data row, however).

Thanks


Re: YARN problem using an external jar in worker nodes Inbox x

2014-03-27 Thread Sung Hwan Chung
Yea it's in a standalone mode and I did use SparkContext.addJar method and
tried setting setExecutorEnv SPARK_CLASSPATH, etc. but none of it worked.

I finally made it work by modifying the ClientBase.scala code where I set
'appMasterOnly' to false before the addJars contents were added to
distCacheMgr. But this is not what I should be doing, right?

Is there a problem with addJar method in 0.9.0?


On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Sung,

 Are you using yarn-standalone mode?  Have you specified the --addJars
 option with your external jars?

 -Sandy


 On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung coded...@cs.stanford.edu
  wrote:

 Hello, (this is Yarn related)

 I'm able to load an external jar and use its classes within
 ApplicationMaster. I wish to use this jar within worker nodes, so I added
 sc.addJar(pathToJar) and ran.

 I get the following exception:

 org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times 
 (most recent failure: Exception failure: java.lang.NoClassDefFoundError: 
 org/opencv/objdetect/HOGDescriptor)
 Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception 
 failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 scala.Option.foreach(Option.scala:236)
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 akka.actor.ActorCell.invoke(ActorCell.scala:456)
 akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 akka.dispatch.Mailbox.run(Mailbox.scala:219)
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



 And in worker node containers' stderr log (nothing in stdout log), I
 don't see any reference to loading jars:

 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in 
 [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in 
 [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/03/26 13:12:18 INFO Remoting: Starting remoting
 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
 driver: 
 akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler
 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver 
 Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 
 - [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! 
 Shutting down.






 Any idea what's going on?





Re: YARN problem using an external jar in worker nodes Inbox x

2014-03-27 Thread Sung Hwan Chung
Well, it says that the jar was successfully added but can't reference
classes from it. Does this have anything to do with this bug?

http://stackoverflow.com/questions/22457645/when-to-use-spark-classpath-or-sparkcontext-addjar


On Thu, Mar 27, 2014 at 2:57 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 I just tried this in CDH (only a few patches ahead of 0.9.0) and was able
 to include a dependency with --addJars successfully.

 Can you share how you're invoking SparkContext.addJar?  Anything
 interesting in the application master logs?

 -Sandy




 On Thu, Mar 27, 2014 at 11:35 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Yea it's in a standalone mode and I did use SparkContext.addJar method
 and tried setting setExecutorEnv SPARK_CLASSPATH, etc. but none of it
 worked.

 I finally made it work by modifying the ClientBase.scala code where I set
 'appMasterOnly' to false before the addJars contents were added to
 distCacheMgr. But this is not what I should be doing, right?

 Is there a problem with addJar method in 0.9.0?


 On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza sandy.r...@cloudera.comwrote:

 Hi Sung,

 Are you using yarn-standalone mode?  Have you specified the --addJars
 option with your external jars?

 -Sandy


 On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Hello, (this is Yarn related)

 I'm able to load an external jar and use its classes within
 ApplicationMaster. I wish to use this jar within worker nodes, so I added
 sc.addJar(pathToJar) and ran.

 I get the following exception:

 org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times 
 (most recent failure: Exception failure: java.lang.NoClassDefFoundError: 
 org/opencv/objdetect/HOGDescriptor)
 Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception 
 failure: java.lang.NoClassDefFoundError: 
 org/opencv/objdetect/HOGDescriptor)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 scala.Option.foreach(Option.scala:236)
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 akka.actor.ActorCell.invoke(ActorCell.scala:456)
 akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 akka.dispatch.Mailbox.run(Mailbox.scala:219)
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



 And in worker node containers' stderr log (nothing in stdout log), I
 don't see any reference to loading jars:

 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in 
 [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in 
 [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/03/26 13:12:18 INFO Remoting: Starting remoting
 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting 
 to driver: 
 akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler
 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver 
 Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 
 - [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! 
 Shutting down

YARN problem using an external jar in worker nodes Inbox x

2014-03-26 Thread Sung Hwan Chung
Hello, (this is Yarn related)

I'm able to load an external jar and use its classes within
ApplicationMaster. I wish to use this jar within worker nodes, so I added
sc.addJar(pathToJar) and ran.

I get the following exception:

org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4
times (most recent failure: Exception failure:
java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor)
Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception
failure: java.lang.NoClassDefFoundError:
org/opencv/objdetect/HOGDescriptor)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
scala.Option.foreach(Option.scala:236)
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
akka.actor.ActorCell.invoke(ActorCell.scala:456)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
akka.dispatch.Mailbox.run(Mailbox.scala:219)
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



And in worker node containers' stderr log (nothing in stdout log), I don't
see any reference to loading jars:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/03/26 13:12:18 INFO Remoting: Starting remoting
14/03/26 13:12:18 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend:
Connecting to driver:
akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler
14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
- [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated!
Shutting down.



Any idea what's going on?