Re: PCA OutOfMemoryError

2016-01-17 Thread Bharath Ravi Kumar
Hello Alex,

Thanks for the response. There isn't much other data on the driver, so the
issue is probably inherent to this particular PCA implementation.  I'll try
the alternative approach that you suggested instead. Thanks again.

-Bharath

On Wed, Jan 13, 2016 at 11:24 PM, Alex Gittens <swift...@gmail.com> wrote:

> The PCA.fit function calls the RowMatrix PCA routine, which attempts to
> construct the covariance matrix locally on the driver, and then computes
> the SVD of that to get the PCs. I'm not sure what's causing the memory
> error: RowMatrix.scala:124 is only using 3.5 GB of memory (n*(n+1)/2 with
> n=29604 and double precision), so unless you're filling up the memory with
> other RDDs, you should have plenty of space on the driver.
>
> One alternative is to manually center your RDD (so make one pass over it
> to compute the mean, then another to subtract it out and form a new RDD),
> then directly call the computeSVD routine in RowMatrix to compute the SVD
> of the gramian matrix of this RDD (e.g., the covariance matrix of the
> original RDD) in a distributed manner, so the covariance matrix doesn't
> need to be formed explicitly. You can look at the getLowRankFactorization
> and convertLowRankFactorizationToEOFs routines at
>
> https://github.com/rustandruin/large-scale-climate/blob/master/src/main/scala/eofs.scala
> for example of this approach (call the second on the results of the first
> to get the SVD of the input matrix to the first; EOF is another name for
> PCA).
>
> This takes about 30 minutes to compute the top 20 PCs of a 46.7K-by-6.3M
> dense matrix of doubles (~2 Tb), with most of the time spent on the
> distributed matrix-vector multiplies.
>
> Best,
> Alex
>
>
> On Tue, Jan 12, 2016 at 6:39 PM, Bharath Ravi Kumar <reachb...@gmail.com>
> wrote:
>
>> Any suggestion/opinion?
>> On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote:
>>
>>> We're running PCA (selecting 100 principal components) on a dataset that
>>> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
>>> matrix in question is mostly sparse with tens of columns populate in most
>>> rows, but a few rows with thousands of columns populated. We're running
>>> spark on mesos with driver memory set to 40G and executor memory set to
>>> 80G. We're however encountering an out of memory error (included at the end
>>> of the message) regardless of the number of rdd partitions or the degree of
>>> task parallelism being set. I noticed a warning at the beginning of the PCA
>>> computation stage: " WARN
>>> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
>>> require at least 7011 megabyte  of memory!"
>>> I don't understand which memory this refers to. Is this the executor
>>> memory?  The driver memory? Any other?
>>> The stacktrace appears to indicate that a large array is probably being
>>> passed along with the task. Could this array have been passed as a
>>> broadcast variable instead ? Any suggestions / workarounds other than
>>> re-implementing the algorithm?
>>>
>>> Thanks,
>>> Bharath
>>>
>>> 
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError: Requested array
>>> size exceeds VM limit
>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>> at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>> at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>> at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>> at
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>>> at
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>>> at
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>>> at
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>>> 

Re: PCA OutOfMemoryError

2016-01-12 Thread Bharath Ravi Kumar
Any suggestion/opinion?
On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote:

> We're running PCA (selecting 100 principal components) on a dataset that
> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
> matrix in question is mostly sparse with tens of columns populate in most
> rows, but a few rows with thousands of columns populated. We're running
> spark on mesos with driver memory set to 40G and executor memory set to
> 80G. We're however encountering an out of memory error (included at the end
> of the message) regardless of the number of rdd partitions or the degree of
> task parallelism being set. I noticed a warning at the beginning of the PCA
> computation stage: " WARN
> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
> require at least 7011 megabyte  of memory!"
> I don't understand which memory this refers to. Is this the executor
> memory?  The driver memory? Any other?
> The stacktrace appears to indicate that a large array is probably being
> passed along with the task. Could this array have been passed as a
> broadcast variable instead ? Any suggestions / workarounds other than
> re-implementing the algorithm?
>
> Thanks,
> Bharath
>
> 
>
> Exception in thread "main" java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
> at
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091)
> at
> org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124)
> at
> org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350)
> at
> org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386)
> at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46)
>
>


PCA OutOfMemoryError

2016-01-12 Thread Bharath Ravi Kumar
We're running PCA (selecting 100 principal components) on a dataset that
has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
matrix in question is mostly sparse with tens of columns populate in most
rows, but a few rows with thousands of columns populated. We're running
spark on mesos with driver memory set to 40G and executor memory set to
80G. We're however encountering an out of memory error (included at the end
of the message) regardless of the number of rdd partitions or the degree of
task parallelism being set. I noticed a warning at the beginning of the PCA
computation stage: " WARN
org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
require at least 7011 megabyte  of memory!"
I don't understand which memory this refers to. Is this the executor
memory?  The driver memory? Any other?
The stacktrace appears to indicate that a large array is probably being
passed along with the task. Could this array have been passed as a
broadcast variable instead ? Any suggestions / workarounds other than
re-implementing the algorithm?

Thanks,
Bharath



Exception in thread "main" java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
at
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091)
at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124)
at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350)
at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386)
at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46)


Re: Spark on Mesos / Executor Memory

2015-10-17 Thread Bharath Ravi Kumar
David, Tom,

Thanks for the explanation. This confirms my suspicion that the executor
was holding on to memory  regardless of tasks in execution once it expands
to occupy memory in keeping with spark.executor.memory. There certainly is
scope for improvement here, though I realize there will substantial
overheads in implementing memory release without compromising RDD caching
and similar aspects. I'll explore alternatives / workarounds meanwhile.

Thanks,
Bharath


On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld <t...@duedil.com> wrote:

> Hi Bharath,
>
> When running jobs in fine grained mode, each Spark task is sent to mesos
> as a task which allows the offers system to maintain fairness between
> different spark application (as you've described). Having said that, unless
> your memory per-node is hugely undersubscribed when running these jobs in
> parallel. This behaviour matches exactly what you've described.
>
> What you're seeing happens because even though there's a new mesos task
> for each Spark task (allowing CPU to be shared) the Spark executors don't
> get killed even when they aren't doing any work, which means the memory
> isn't released. The JVM doesn't allow for flexible memory re-allocation (as
> far as i'm aware) which make it impossible for spark to dynamically scale
> up the memory of the executor over time as tasks start and finish.
>
> As Dave pointed out, the simplest way to solve this is to use a higher
> level tool that can run your spark jobs through one mesos framework and
> then you can let spark distribute the resources more effectively.
>
> I hope that helps!
>
> Tom.
>
> On 17 Oct 2015, at 06:47, Bharath Ravi Kumar <reachb...@gmail.com> wrote:
>
> Can someone respond if you're aware of the reason for such a memory
> footprint? It seems unintuitive and hard to reason about.
>
> Thanks,
> Bharath
>
> On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar <reachb...@gmail.com>
> wrote:
>
>> Resending since user@mesos bounced earlier. My apologies.
>>
>> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <reachb...@gmail.com
>> > wrote:
>>
>>> (Reviving this thread since I ran into similar issues...)
>>>
>>> I'm running two spark jobs (in mesos fine grained mode), each belonging
>>> to a different mesos role, say low and high. The low:high mesos weights are
>>> 1:10. On expected lines, I see that the low priority job occupies cluster
>>> resources to the maximum extent when running alone. However, when the high
>>> priority job is submitted, it does not start and continues to await cluster
>>> resources (as seen in the logs). Since the jobs run in fine grained mode
>>> and the low priority tasks begin to finish, the high priority job should
>>> ideally be able to start and gradually take over cluster resources as per
>>> the weights. However, I noticed that while the "low" job gives up CPU cores
>>> with each completing task (e.g. reduction from 72 -> 12 with default
>>> parallelism set to 72), the memory resources are held on (~500G out of
>>> 768G). The spark.executor.memory setting appears to directly impact the
>>> amount of memory that the job holds on to. In this case, it was set to 200G
>>> in the low priority task and 100G in the high priority task. The nature of
>>> these jobs is such that setting the numbers to smaller values (say 32g)
>>> resulted in job failures with outofmemoryerror.  It appears that the spark
>>> framework is retaining memory (across tasks)  proportional to
>>> spark.executor.memory for the duration of the job and not releasing memory
>>> as tasks complete. This defeats the purpose of fine grained mode execution
>>> as the memory occupancy is preventing the high priority job from accepting
>>> the prioritized cpu offers and beginning execution. Can this be explained /
>>> documented better please?
>>>
>>> Thanks,
>>> Bharath
>>>
>>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote:
>>>
>>>> (Adding spark user list)
>>>>
>>>> Hi Tom,
>>>>
>>>> If I understand correctly you're saying that you're running into memory
>>>> problems because the scheduler is allocating too much CPUs and not enough
>>>> memory to acoomodate them right?
>>>>
>>>> In the case of fine grain mode I don't think that's a problem since we
>>>> have a fixed amount of CPU and memory per task.
>>>> However, in coarse grain you can run into that problem if you're with
>>>> in the spa

Re: Spark on Mesos / Executor Memory

2015-10-17 Thread Bharath Ravi Kumar
To be precise, the MesosExecutorBackend's Xms & Xmx equal
spark.executor.memory. So there's no question of expanding or contracting
the memory held by the executor.

On Sat, Oct 17, 2015 at 5:38 PM, Bharath Ravi Kumar <reachb...@gmail.com>
wrote:

> David, Tom,
>
> Thanks for the explanation. This confirms my suspicion that the executor
> was holding on to memory  regardless of tasks in execution once it expands
> to occupy memory in keeping with spark.executor.memory. There certainly is
> scope for improvement here, though I realize there will substantial
> overheads in implementing memory release without compromising RDD caching
> and similar aspects. I'll explore alternatives / workarounds meanwhile.
>
> Thanks,
> Bharath
>
>
>
> On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld <t...@duedil.com> wrote:
>
>> Hi Bharath,
>>
>> When running jobs in fine grained mode, each Spark task is sent to mesos
>> as a task which allows the offers system to maintain fairness between
>> different spark application (as you've described). Having said that, unless
>> your memory per-node is hugely undersubscribed when running these jobs in
>> parallel. This behaviour matches exactly what you've described.
>>
>> What you're seeing happens because even though there's a new mesos task
>> for each Spark task (allowing CPU to be shared) the Spark executors don't
>> get killed even when they aren't doing any work, which means the memory
>> isn't released. The JVM doesn't allow for flexible memory re-allocation (as
>> far as i'm aware) which make it impossible for spark to dynamically scale
>> up the memory of the executor over time as tasks start and finish.
>>
>> As Dave pointed out, the simplest way to solve this is to use a higher
>> level tool that can run your spark jobs through one mesos framework and
>> then you can let spark distribute the resources more effectively.
>>
>> I hope that helps!
>>
>> Tom.
>>
>> On 17 Oct 2015, at 06:47, Bharath Ravi Kumar <reachb...@gmail.com> wrote:
>>
>> Can someone respond if you're aware of the reason for such a memory
>> footprint? It seems unintuitive and hard to reason about.
>>
>> Thanks,
>> Bharath
>>
>> On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar <reachb...@gmail.com
>> > wrote:
>>
>>> Resending since user@mesos bounced earlier. My apologies.
>>>
>>> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <
>>> reachb...@gmail.com> wrote:
>>>
>>>> (Reviving this thread since I ran into similar issues...)
>>>>
>>>> I'm running two spark jobs (in mesos fine grained mode), each belonging
>>>> to a different mesos role, say low and high. The low:high mesos weights are
>>>> 1:10. On expected lines, I see that the low priority job occupies cluster
>>>> resources to the maximum extent when running alone. However, when the high
>>>> priority job is submitted, it does not start and continues to await cluster
>>>> resources (as seen in the logs). Since the jobs run in fine grained mode
>>>> and the low priority tasks begin to finish, the high priority job should
>>>> ideally be able to start and gradually take over cluster resources as per
>>>> the weights. However, I noticed that while the "low" job gives up CPU cores
>>>> with each completing task (e.g. reduction from 72 -> 12 with default
>>>> parallelism set to 72), the memory resources are held on (~500G out of
>>>> 768G). The spark.executor.memory setting appears to directly impact the
>>>> amount of memory that the job holds on to. In this case, it was set to 200G
>>>> in the low priority task and 100G in the high priority task. The nature of
>>>> these jobs is such that setting the numbers to smaller values (say 32g)
>>>> resulted in job failures with outofmemoryerror.  It appears that the spark
>>>> framework is retaining memory (across tasks)  proportional to
>>>> spark.executor.memory for the duration of the job and not releasing memory
>>>> as tasks complete. This defeats the purpose of fine grained mode execution
>>>> as the memory occupancy is preventing the high priority job from accepting
>>>> the prioritized cpu offers and beginning execution. Can this be explained /
>>>> documented better please?
>>>>
>>>> Thanks,
>>>> Bharath
>>>>
>>>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote:
>>>>
>>>

Re: Spark on Mesos / Executor Memory

2015-10-16 Thread Bharath Ravi Kumar
Can someone respond if you're aware of the reason for such a memory
footprint? It seems unintuitive and hard to reason about.

Thanks,
Bharath

On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar <reachb...@gmail.com>
wrote:

> Resending since user@mesos bounced earlier. My apologies.
>
> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <reachb...@gmail.com>
> wrote:
>
>> (Reviving this thread since I ran into similar issues...)
>>
>> I'm running two spark jobs (in mesos fine grained mode), each belonging
>> to a different mesos role, say low and high. The low:high mesos weights are
>> 1:10. On expected lines, I see that the low priority job occupies cluster
>> resources to the maximum extent when running alone. However, when the high
>> priority job is submitted, it does not start and continues to await cluster
>> resources (as seen in the logs). Since the jobs run in fine grained mode
>> and the low priority tasks begin to finish, the high priority job should
>> ideally be able to start and gradually take over cluster resources as per
>> the weights. However, I noticed that while the "low" job gives up CPU cores
>> with each completing task (e.g. reduction from 72 -> 12 with default
>> parallelism set to 72), the memory resources are held on (~500G out of
>> 768G). The spark.executor.memory setting appears to directly impact the
>> amount of memory that the job holds on to. In this case, it was set to 200G
>> in the low priority task and 100G in the high priority task. The nature of
>> these jobs is such that setting the numbers to smaller values (say 32g)
>> resulted in job failures with outofmemoryerror.  It appears that the spark
>> framework is retaining memory (across tasks)  proportional to
>> spark.executor.memory for the duration of the job and not releasing memory
>> as tasks complete. This defeats the purpose of fine grained mode execution
>> as the memory occupancy is preventing the high priority job from accepting
>> the prioritized cpu offers and beginning execution. Can this be explained /
>> documented better please?
>>
>> Thanks,
>> Bharath
>>
>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote:
>>
>>> (Adding spark user list)
>>>
>>> Hi Tom,
>>>
>>> If I understand correctly you're saying that you're running into memory
>>> problems because the scheduler is allocating too much CPUs and not enough
>>> memory to acoomodate them right?
>>>
>>> In the case of fine grain mode I don't think that's a problem since we
>>> have a fixed amount of CPU and memory per task.
>>> However, in coarse grain you can run into that problem if you're with in
>>> the spark.cores.max limit, and memory is a fixed number.
>>>
>>> I have a patch out to configure how much max cpus should coarse grain
>>> executor use, and it also allows multiple executors in coarse grain mode.
>>> So you could say try to launch multiples of max 4 cores with
>>> spark.executor.memory (+ overhead and etc) in a slave. (
>>> https://github.com/apache/spark/pull/4027)
>>>
>>> It also might be interesting to include a cores to memory multiplier so
>>> that with a larger amount of cores we try to scale the memory with some
>>> factor, but I'm not entirely sure that's intuitive to use and what people
>>> know what to set it to, as that can likely change with different workload.
>>>
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld <t...@duedil.com> wrote:
>>>
>>>> We're running Spark 1.3.0 (with a couple of patches over the top for
>>>> docker related bits).
>>>>
>>>> I don't think SPARK-4158 is related to what we're seeing, things do run
>>>> fine on the cluster, given a ridiculously large executor memory
>>>> configuration. As for SPARK-3535 although that looks useful I think we'e
>>>> seeing something else.
>>>>
>>>> Put a different way, the amount of memory required at any given time by
>>>> the spark JVM process is directly proportional to the amount of CPU it has,
>>>> because more CPU means more tasks and more tasks means more memory. Even if
>>>> we're using coarse mode, the amount of executor memory should be
>>>> proportionate to the amount of CPUs in the offer.
>>>>
>>>> On 11 April 2015 at 17:39, Brenden Matthews <bren...@diddyinc.com>
>>>>

Re: Spark on Mesos / Executor Memory

2015-10-15 Thread Bharath Ravi Kumar
Resending since user@mesos bounced earlier. My apologies.

On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <reachb...@gmail.com>
wrote:

> (Reviving this thread since I ran into similar issues...)
>
> I'm running two spark jobs (in mesos fine grained mode), each belonging to
> a different mesos role, say low and high. The low:high mesos weights are
> 1:10. On expected lines, I see that the low priority job occupies cluster
> resources to the maximum extent when running alone. However, when the high
> priority job is submitted, it does not start and continues to await cluster
> resources (as seen in the logs). Since the jobs run in fine grained mode
> and the low priority tasks begin to finish, the high priority job should
> ideally be able to start and gradually take over cluster resources as per
> the weights. However, I noticed that while the "low" job gives up CPU cores
> with each completing task (e.g. reduction from 72 -> 12 with default
> parallelism set to 72), the memory resources are held on (~500G out of
> 768G). The spark.executor.memory setting appears to directly impact the
> amount of memory that the job holds on to. In this case, it was set to 200G
> in the low priority task and 100G in the high priority task. The nature of
> these jobs is such that setting the numbers to smaller values (say 32g)
> resulted in job failures with outofmemoryerror.  It appears that the spark
> framework is retaining memory (across tasks)  proportional to
> spark.executor.memory for the duration of the job and not releasing memory
> as tasks complete. This defeats the purpose of fine grained mode execution
> as the memory occupancy is preventing the high priority job from accepting
> the prioritized cpu offers and beginning execution. Can this be explained /
> documented better please?
>
> Thanks,
> Bharath
>
> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote:
>
>> (Adding spark user list)
>>
>> Hi Tom,
>>
>> If I understand correctly you're saying that you're running into memory
>> problems because the scheduler is allocating too much CPUs and not enough
>> memory to acoomodate them right?
>>
>> In the case of fine grain mode I don't think that's a problem since we
>> have a fixed amount of CPU and memory per task.
>> However, in coarse grain you can run into that problem if you're with in
>> the spark.cores.max limit, and memory is a fixed number.
>>
>> I have a patch out to configure how much max cpus should coarse grain
>> executor use, and it also allows multiple executors in coarse grain mode.
>> So you could say try to launch multiples of max 4 cores with
>> spark.executor.memory (+ overhead and etc) in a slave. (
>> https://github.com/apache/spark/pull/4027)
>>
>> It also might be interesting to include a cores to memory multiplier so
>> that with a larger amount of cores we try to scale the memory with some
>> factor, but I'm not entirely sure that's intuitive to use and what people
>> know what to set it to, as that can likely change with different workload.
>>
>> Tim
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld <t...@duedil.com> wrote:
>>
>>> We're running Spark 1.3.0 (with a couple of patches over the top for
>>> docker related bits).
>>>
>>> I don't think SPARK-4158 is related to what we're seeing, things do run
>>> fine on the cluster, given a ridiculously large executor memory
>>> configuration. As for SPARK-3535 although that looks useful I think we'e
>>> seeing something else.
>>>
>>> Put a different way, the amount of memory required at any given time by
>>> the spark JVM process is directly proportional to the amount of CPU it has,
>>> because more CPU means more tasks and more tasks means more memory. Even if
>>> we're using coarse mode, the amount of executor memory should be
>>> proportionate to the amount of CPUs in the offer.
>>>
>>> On 11 April 2015 at 17:39, Brenden Matthews <bren...@diddyinc.com>
>>> wrote:
>>>
>>>> I ran into some issues with it a while ago, and submitted a couple PRs
>>>> to fix it:
>>>>
>>>> https://github.com/apache/spark/pull/2401
>>>> https://github.com/apache/spark/pull/3024
>>>>
>>>> Do these look relevant? What version of Spark are you running?
>>>>
>>>> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld <t...@duedil.com> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> Not s

Re: Spark on Mesos / Executor Memory

2015-10-15 Thread Bharath Ravi Kumar
(Reviving this thread since I ran into similar issues...)

I'm running two spark jobs (in mesos fine grained mode), each belonging to
a different mesos role, say low and high. The low:high mesos weights are
1:10. On expected lines, I see that the low priority job occupies cluster
resources to the maximum extent when running alone. However, when the high
priority job is submitted, it does not start and continues to await cluster
resources (as seen in the logs). Since the jobs run in fine grained mode
and the low priority tasks begin to finish, the high priority job should
ideally be able to start and gradually take over cluster resources as per
the weights. However, I noticed that while the "low" job gives up CPU cores
with each completing task (e.g. reduction from 72 -> 12 with default
parallelism set to 72), the memory resources are held on (~500G out of
768G). The spark.executor.memory setting appears to directly impact the
amount of memory that the job holds on to. In this case, it was set to 200G
in the low priority task and 100G in the high priority task. The nature of
these jobs is such that setting the numbers to smaller values (say 32g)
resulted in job failures with outofmemoryerror.  It appears that the spark
framework is retaining memory (across tasks)  proportional to
spark.executor.memory for the duration of the job and not releasing memory
as tasks complete. This defeats the purpose of fine grained mode execution
as the memory occupancy is preventing the high priority job from accepting
the prioritized cpu offers and beginning execution. Can this be explained /
documented better please?

Thanks,
Bharath

On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen  wrote:

> (Adding spark user list)
>
> Hi Tom,
>
> If I understand correctly you're saying that you're running into memory
> problems because the scheduler is allocating too much CPUs and not enough
> memory to acoomodate them right?
>
> In the case of fine grain mode I don't think that's a problem since we
> have a fixed amount of CPU and memory per task.
> However, in coarse grain you can run into that problem if you're with in
> the spark.cores.max limit, and memory is a fixed number.
>
> I have a patch out to configure how much max cpus should coarse grain
> executor use, and it also allows multiple executors in coarse grain mode.
> So you could say try to launch multiples of max 4 cores with
> spark.executor.memory (+ overhead and etc) in a slave. (
> https://github.com/apache/spark/pull/4027)
>
> It also might be interesting to include a cores to memory multiplier so
> that with a larger amount of cores we try to scale the memory with some
> factor, but I'm not entirely sure that's intuitive to use and what people
> know what to set it to, as that can likely change with different workload.
>
> Tim
>
>
>
>
>
>
>
> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld  wrote:
>
>> We're running Spark 1.3.0 (with a couple of patches over the top for
>> docker related bits).
>>
>> I don't think SPARK-4158 is related to what we're seeing, things do run
>> fine on the cluster, given a ridiculously large executor memory
>> configuration. As for SPARK-3535 although that looks useful I think we'e
>> seeing something else.
>>
>> Put a different way, the amount of memory required at any given time by
>> the spark JVM process is directly proportional to the amount of CPU it has,
>> because more CPU means more tasks and more tasks means more memory. Even if
>> we're using coarse mode, the amount of executor memory should be
>> proportionate to the amount of CPUs in the offer.
>>
>> On 11 April 2015 at 17:39, Brenden Matthews  wrote:
>>
>>> I ran into some issues with it a while ago, and submitted a couple PRs
>>> to fix it:
>>>
>>> https://github.com/apache/spark/pull/2401
>>> https://github.com/apache/spark/pull/3024
>>>
>>> Do these look relevant? What version of Spark are you running?
>>>
>>> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld  wrote:
>>>
 Hey,

 Not sure whether it's best to ask this on the spark mailing list or the
 mesos one, so I'll try here first :-)

 I'm having a bit of trouble with out of memory errors in my spark
 jobs... it seems fairly odd to me that memory resources can only be set at
 the executor level, and not also at the task level. For example, as far as
 I can tell there's only a *spark.executor.memory* config option.

 Surely the memory requirements of a single executor are quite
 dramatically influenced by the number of concurrent tasks running? Given a
 shared cluster, I have no idea what % of an individual slave my executor is
 going to get, so I basically have to set the executor memory to a value
 that's correct when the whole machine is in use...

 Has anyone else running Spark on Mesos come across this, or maybe
 someone could correct my understanding of the config options?


Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-03-19 Thread Bharath Ravi Kumar
Hi Doug,

I did try setting that config parameter to a larger number (several
minutes), but still wasn't able to retrieve additional context logs. Let us
know if you have any success with it.

Thanks,
Bharath

On Fri, Mar 20, 2015 at 3:21 AM, Doug Balog doug.sparku...@dugos.com
wrote:

 I’m seeing the same problem.
 I’ve set logging to DEBUG, and I think some hints are in the “Yarn AM
 launch context” that is printed out
 before Yarn  runs java.

 My next step is to talk to the admins and get them to set
 yarn.nodemanager.delete.debug-delay-sec
 in the config, as recommended in
 http://spark.apache.org/docs/latest/running-on-yarn.html
 Then I can see exactly whats in the directory.

 Doug

 ps Sorry for the dup message Bharath and Todd, used wrong email address.


  On Mar 19, 2015, at 1:19 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
 
  Thanks for clarifying Todd. This may then be an issue specific to the
 HDP version we're using. Will continue to debug and post back if there's
 any resolution.
 
  On Thu, Mar 19, 2015 at 3:40 AM, Todd Nist tsind...@gmail.com wrote:
  Yes I believe you are correct.
 
  For the build you may need to specify the specific HDP version of hadoop
 to use with the -Dhadoop.version=.  I went with the default 2.6.0, but
 Horton may have a vendor specific version that needs to go here.  I know I
 saw a similar post today where the solution was to use
 -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera installation.
 I am not sure what the HDP version would be to put here.
 
  -Todd
 
  On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar 
 reachb...@gmail.com wrote:
  Hi Todd,
 
  Yes, those entries were present in the conf under the same SPARK_HOME
 that was used to run spark-submit. On a related note, I'm assuming that the
 additional spark yarn options (like spark.yarn.jar) need to be set in the
 same properties file that is passed to spark-submit. That apart, I assume
 that no other host on the cluster should require a deployment of the
 spark distribution or any other config change to support a spark job.
 Isn't that correct?
 
  On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote:
  Hi Bharath,
 
  Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf
 file?
 
  spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041
  spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041
 
 
 
 
  On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Still no luck running purpose-built 1.3 against HDP 2.2 after following
 all the instructions. Anyone else faced this issue?
 
  On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Hi Todd,
 
  Thanks for the help. I'll try again after building a distribution with
 the 1.3 sources. However, I wanted to confirm what I mentioned earlier:  is
 it sufficient to copy the distribution only to the client host from where
 spark-submit is invoked(with spark.yarn.jar set), or is there a need to
 ensure that the entire distribution is deployed made available pre-deployed
 on every host in the yarn cluster? I'd assume that the latter shouldn't be
 necessary.
 
  On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote:
  Hi Bharath,
 
  I ran into the same issue a few days ago, here is a link to a post on
 Horton's fourm.
 http://hortonworks.com/community/forums/search/spark+1.2.1/
  Incase anyone else needs to perform this these are the steps I took to
 get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3:
 
  1. Pull 1.2.1 Source
  2. Apply the following patches
  a. Address jackson version, https://github.com/apache/spark/pull/3938
  b. Address the propagation of the hdp.version set in the
 spark-default.conf, https://github.com/apache/spark/pull/3409
  3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz
 -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
 -DskipTests package
 
  Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz
 following instructions in the HDP Spark preview
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/
 
  FWIW spark-1.3.0 appears to be working fine with HDP as well and steps
 2a and 2b are not required.
 
  HTH
 
  -Todd
 
 
  On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar 
 reachb...@gmail.com wrote:
  Hi,
 
  Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster
 results in the AM failing to start with following error on stderr:
  Error: Could not find or load main class
 org.apache.spark.deploy.yarn.ExecutorLauncher
  An application id was assigned to the job, but there were no logs. Note
 that the spark distribution has not been installed on every host in the
 cluster and the aforementioned spark build was copied  to one of the hadoop
 client hosts in the cluster to launch the
  job. Spark-submit was run with --master yarn-client and spark.yarn.jar
 was set to the assembly jar from the above distribution

Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-03-18 Thread Bharath Ravi Kumar
Thanks for clarifying Todd. This may then be an issue specific to the HDP
version we're using. Will continue to debug and post back if there's any
resolution.

On Thu, Mar 19, 2015 at 3:40 AM, Todd Nist tsind...@gmail.com wrote:

 Yes I believe you are correct.

 For the build you may need to specify the specific HDP version of hadoop
 to use with the -Dhadoop.version=.  I went with the default 2.6.0,
 but Horton may have a vendor specific version that needs to go here.  I
 know I saw a similar post today where the solution was to use
 -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera
 installation.  I am not sure what the HDP version would be to put here.

 -Todd

 On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Hi Todd,

 Yes, those entries were present in the conf under the same SPARK_HOME
 that was used to run spark-submit. On a related note, I'm assuming that the
 additional spark yarn options (like spark.yarn.jar) need to be set in the
 same properties file that is passed to spark-submit. That apart, I assume
 that no other host on the cluster should require a deployment of the
 spark distribution or any other config change to support a spark job.
 Isn't that correct?

 On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Bharath,

 Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf
 file?

 spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041
 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041




 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com
  wrote:

 Still no luck running purpose-built 1.3 against HDP 2.2 after following
 all the instructions. Anyone else faced this issue?

 On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar 
 reachb...@gmail.com wrote:

 Hi Todd,

 Thanks for the help. I'll try again after building a distribution with
 the 1.3 sources. However, I wanted to confirm what I mentioned earlier:  
 is
 it sufficient to copy the distribution only to the client host from where
 spark-submit is invoked(with spark.yarn.jar set), or is there a need to
 ensure that the entire distribution is deployed made available 
 pre-deployed
 on every host in the yarn cluster? I'd assume that the latter shouldn't be
 necessary.

 On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Bharath,

 I ran into the same issue a few days ago, here is a link to a post on
 Horton's fourm.
 http://hortonworks.com/community/forums/search/spark+1.2.1/

 Incase anyone else needs to perform this these are the steps I took
 to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3:

 1. Pull 1.2.1 Source
 2. Apply the following patches
 a. Address jackson version, https://github.com/apache/spark/pull/3938
 b. Address the propagation of the hdp.version set in the
 spark-default.conf, https://github.com/apache/spark/pull/3409
 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz
 -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
 -DskipTests package

 Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz
 following instructions in the HDP Spark preview
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/

 FWIW spark-1.3.0 appears to be working fine with HDP as well and
 steps 2a and 2b are not required.

 HTH

 -Todd

 On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar 
 reachb...@gmail.com wrote:

 Hi,

 Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster 
 results in the AM failing to start with following error on stderr:
 Error: Could not find or load main class 
 org.apache.spark.deploy.yarn.ExecutorLauncher
 An application id was assigned to the job, but there were no logs. Note 
 that the spark distribution has not been installed on every host in 
 the cluster and the aforementioned spark build was copied  to one of 
 the hadoop client hosts in the cluster to launch the
 job. Spark-submit was run with --master yarn-client and spark.yarn.jar 
 was set to the assembly jar from the above distribution. Switching the 
 spark distribution to the HDP recommended  version
 and following the instructions on this page 
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did 
 not fix the problem either. Any idea what may have caused this error ?

 Thanks,
 Bharath










Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-03-17 Thread Bharath Ravi Kumar
Hi Todd,

Yes, those entries were present in the conf under the same SPARK_HOME that
was used to run spark-submit. On a related note, I'm assuming that the
additional spark yarn options (like spark.yarn.jar) need to be set in the
same properties file that is passed to spark-submit. That apart, I assume
that no other host on the cluster should require a deployment of the
spark distribution or any other config change to support a spark job.
Isn't that correct?

On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Bharath,

 Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf
 file?

 spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041
 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041




 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Still no luck running purpose-built 1.3 against HDP 2.2 after following
 all the instructions. Anyone else faced this issue?

 On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Hi Todd,

 Thanks for the help. I'll try again after building a distribution with
 the 1.3 sources. However, I wanted to confirm what I mentioned earlier:  is
 it sufficient to copy the distribution only to the client host from where
 spark-submit is invoked(with spark.yarn.jar set), or is there a need to
 ensure that the entire distribution is deployed made available pre-deployed
 on every host in the yarn cluster? I'd assume that the latter shouldn't be
 necessary.

 On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Bharath,

 I ran into the same issue a few days ago, here is a link to a post on
 Horton's fourm.
 http://hortonworks.com/community/forums/search/spark+1.2.1/

 Incase anyone else needs to perform this these are the steps I took to
 get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3:

 1. Pull 1.2.1 Source
 2. Apply the following patches
 a. Address jackson version, https://github.com/apache/spark/pull/3938
 b. Address the propagation of the hdp.version set in the
 spark-default.conf, https://github.com/apache/spark/pull/3409
 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz
 -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
 -DskipTests package

 Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz
 following instructions in the HDP Spark preview
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/

 FWIW spark-1.3.0 appears to be working fine with HDP as well and steps
 2a and 2b are not required.

 HTH

 -Todd

 On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar 
 reachb...@gmail.com wrote:

 Hi,

 Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster 
 results in the AM failing to start with following error on stderr:
 Error: Could not find or load main class 
 org.apache.spark.deploy.yarn.ExecutorLauncher
 An application id was assigned to the job, but there were no logs. Note 
 that the spark distribution has not been installed on every host in the 
 cluster and the aforementioned spark build was copied  to one of the 
 hadoop client hosts in the cluster to launch the
 job. Spark-submit was run with --master yarn-client and spark.yarn.jar 
 was set to the assembly jar from the above distribution. Switching the 
 spark distribution to the HDP recommended  version
 and following the instructions on this page 
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not 
 fix the problem either. Any idea what may have caused this error ?

 Thanks,
 Bharath








HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-03-16 Thread Bharath Ravi Kumar
Hi,

Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster
results in the AM failing to start with following error on stderr:
Error: Could not find or load main class
org.apache.spark.deploy.yarn.ExecutorLauncher
An application id was assigned to the job, but there were no logs.
Note that the spark distribution has not been installed on every
host in the cluster and the aforementioned spark build was copied  to
one of the hadoop client hosts in the cluster to launch the
job. Spark-submit was run with --master yarn-client and spark.yarn.jar
was set to the assembly jar from the above distribution. Switching the
spark distribution to the HDP recommended  version
and following the instructions on this page
http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did
not fix the problem either. Any idea what may have caused this error ?

Thanks,
Bharath


Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-03-16 Thread Bharath Ravi Kumar
Hi Todd,

Thanks for the help. I'll try again after building a distribution with the
1.3 sources. However, I wanted to confirm what I mentioned earlier:  is it
sufficient to copy the distribution only to the client host from where
spark-submit is invoked(with spark.yarn.jar set), or is there a need to
ensure that the entire distribution is deployed made available pre-deployed
on every host in the yarn cluster? I'd assume that the latter shouldn't be
necessary.

On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Bharath,

 I ran into the same issue a few days ago, here is a link to a post on
 Horton's fourm.
 http://hortonworks.com/community/forums/search/spark+1.2.1/

 Incase anyone else needs to perform this these are the steps I took to get
 it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3:

 1. Pull 1.2.1 Source
 2. Apply the following patches
 a. Address jackson version, https://github.com/apache/spark/pull/3938
 b. Address the propagation of the hdp.version set in the
 spark-default.conf, https://github.com/apache/spark/pull/3409
 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz
 -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
 -DskipTests package

 Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz
 following instructions in the HDP Spark preview
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/

 FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a
 and 2b are not required.

 HTH

 -Todd

 On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Hi,

 Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster 
 results in the AM failing to start with following error on stderr:
 Error: Could not find or load main class 
 org.apache.spark.deploy.yarn.ExecutorLauncher
 An application id was assigned to the job, but there were no logs. Note that 
 the spark distribution has not been installed on every host in the cluster 
 and the aforementioned spark build was copied  to one of the hadoop client 
 hosts in the cluster to launch the
 job. Spark-submit was run with --master yarn-client and spark.yarn.jar was 
 set to the assembly jar from the above distribution. Switching the spark 
 distribution to the HDP recommended  version
 and following the instructions on this page 
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix 
 the problem either. Any idea what may have caused this error ?

 Thanks,
 Bharath





Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-03-16 Thread Bharath Ravi Kumar
Still no luck running purpose-built 1.3 against HDP 2.2 after following all
the instructions. Anyone else faced this issue?

On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 Hi Todd,

 Thanks for the help. I'll try again after building a distribution with the
 1.3 sources. However, I wanted to confirm what I mentioned earlier:  is it
 sufficient to copy the distribution only to the client host from where
 spark-submit is invoked(with spark.yarn.jar set), or is there a need to
 ensure that the entire distribution is deployed made available pre-deployed
 on every host in the yarn cluster? I'd assume that the latter shouldn't be
 necessary.

 On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Bharath,

 I ran into the same issue a few days ago, here is a link to a post on
 Horton's fourm.
 http://hortonworks.com/community/forums/search/spark+1.2.1/

 Incase anyone else needs to perform this these are the steps I took to
 get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3:

 1. Pull 1.2.1 Source
 2. Apply the following patches
 a. Address jackson version, https://github.com/apache/spark/pull/3938
 b. Address the propagation of the hdp.version set in the
 spark-default.conf, https://github.com/apache/spark/pull/3409
 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz
 -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
 -DskipTests package

 Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz
 following instructions in the HDP Spark preview
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/

 FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a
 and 2b are not required.

 HTH

 -Todd

 On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com
  wrote:

 Hi,

 Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster 
 results in the AM failing to start with following error on stderr:
 Error: Could not find or load main class 
 org.apache.spark.deploy.yarn.ExecutorLauncher
 An application id was assigned to the job, but there were no logs. Note 
 that the spark distribution has not been installed on every host in the 
 cluster and the aforementioned spark build was copied  to one of the hadoop 
 client hosts in the cluster to launch the
 job. Spark-submit was run with --master yarn-client and spark.yarn.jar was 
 set to the assembly jar from the above distribution. Switching the spark 
 distribution to the HDP recommended  version
 and following the instructions on this page 
 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not 
 fix the problem either. Any idea what may have caused this error ?

 Thanks,
 Bharath






Re: ALS failure with size Integer.MAX_VALUE

2014-12-15 Thread Bharath Ravi Kumar
Ok. We'll try using it in a test cluster running 1.2.
On 16-Dec-2014 1:36 am, Xiangrui Meng men...@gmail.com wrote:

Unfortunately, it will depends on the Sorter API in 1.2. -Xiangrui

On Mon, Dec 15, 2014 at 11:48 AM, Bharath Ravi Kumar
reachb...@gmail.com wrote:
 Hi Xiangrui,

 The block size limit was encountered even with reduced number of item
blocks
 as you had expected. I'm wondering if I could try the new implementation
as
 a standalone library against a 1.1 deployment. Does it have dependencies
on
 any core API's in the current master?

 Thanks,
 Bharath

 On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Thanks Xiangrui. I'll try out setting a smaller number of item blocks.
And
 yes, I've been following the JIRA for the new ALS implementation. I'll
try
 it out when it's ready for testing. .

 On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 You can try setting a small item blocks in this case. 1200 is
 definitely too large for ALS. Please try 30 or even smaller. I'm not
 sure whether this could solve the problem because you have 100 items
 connected with 10^8 users. There is a JIRA for this issue:

 https://issues.apache.org/jira/browse/SPARK-3735

 which I will try to implement in 1.3. I'll ping you when it is ready.

 Best,
 Xiangrui

 On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com

 wrote:
  Yes, the issue appears to be due to the 2GB block size limitation. I
am
  hence looking for (user, product) block sizing suggestions to work
  around
  the block size limitation.
 
  On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote:
 
  (It won't be that, since you see that the error occur when reading a
  block from disk. I think this is an instance of the 2GB block size
  limitation.)
 
  On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
  ilya.gane...@capitalone.com wrote:
   Hi Bharath – I’m unsure if this is your problem but the
   MatrixFactorizationModel in MLLIB which is the underlying component
   for
   ALS
   expects your User/Product fields to be integers. Specifically, the
   input
   to
   ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
   wondering if
   perhaps one of your identifiers exceeds MAX_INT, could you write a
   quick
   check for that?
  
   I have been running a very similar use case to yours (with more
   constrained
   hardware resources) and I haven’t seen this exact problem but I’m
   sure
   we’ve
   seen similar issues. Please let me know if you have other
questions.
  
   From: Bharath Ravi Kumar reachb...@gmail.com
   Date: Thursday, November 27, 2014 at 1:30 PM
   To: user@spark.apache.org user@spark.apache.org
   Subject: ALS failure with size  Integer.MAX_VALUE
  
   We're training a recommender with ALS in mllib 1.1 against a
dataset
   of
   150M
   users and 4.5K items, with the total number of training records
   being
   1.2
   Billion (~30GB data). The input data is spread across 1200
   partitions on
   HDFS. For the training, rank=10, and we've configured {number of
   user
   data
   blocks = number of item data blocks}. The number of user/item
blocks
   was
   varied  between 50 to 1200. Irrespective of the block size (e.g. at
   1200
   blocks each), there are atleast a couple of tasks that end up
   shuffle
   reading  9.7G each in the aggregate stage (ALS.scala:337) and
   failing
   with
   the following exception:
  
   java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
   at
   org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
   at
   org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
   at
  
  
  
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
   at
  
  
  
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
  
 
 





Re: ALS failure with size Integer.MAX_VALUE

2014-12-14 Thread Bharath Ravi Kumar
Hi Xiangrui,

The block size limit was encountered even with reduced number of item
blocks as you had expected. I'm wondering if I could try the new
implementation as a standalone library against a 1.1 deployment. Does it
have dependencies on any core API's in the current master?

Thanks,
Bharath

On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And
 yes, I've been following the JIRA for the new ALS implementation. I'll try
 it out when it's ready for testing. .

 On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 You can try setting a small item blocks in this case. 1200 is
 definitely too large for ALS. Please try 30 or even smaller. I'm not
 sure whether this could solve the problem because you have 100 items
 connected with 10^8 users. There is a JIRA for this issue:

 https://issues.apache.org/jira/browse/SPARK-3735

 which I will try to implement in 1.3. I'll ping you when it is ready.

 Best,
 Xiangrui

 On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Yes, the issue appears to be due to the 2GB block size limitation. I am
  hence looking for (user, product) block sizing suggestions to work
 around
  the block size limitation.
 
  On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote:
 
  (It won't be that, since you see that the error occur when reading a
  block from disk. I think this is an instance of the 2GB block size
  limitation.)
 
  On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
  ilya.gane...@capitalone.com wrote:
   Hi Bharath – I’m unsure if this is your problem but the
   MatrixFactorizationModel in MLLIB which is the underlying component
 for
   ALS
   expects your User/Product fields to be integers. Specifically, the
 input
   to
   ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
   wondering if
   perhaps one of your identifiers exceeds MAX_INT, could you write a
 quick
   check for that?
  
   I have been running a very similar use case to yours (with more
   constrained
   hardware resources) and I haven’t seen this exact problem but I’m
 sure
   we’ve
   seen similar issues. Please let me know if you have other questions.
  
   From: Bharath Ravi Kumar reachb...@gmail.com
   Date: Thursday, November 27, 2014 at 1:30 PM
   To: user@spark.apache.org user@spark.apache.org
   Subject: ALS failure with size  Integer.MAX_VALUE
  
   We're training a recommender with ALS in mllib 1.1 against a dataset
 of
   150M
   users and 4.5K items, with the total number of training records being
   1.2
   Billion (~30GB data). The input data is spread across 1200
 partitions on
   HDFS. For the training, rank=10, and we've configured {number of user
   data
   blocks = number of item data blocks}. The number of user/item blocks
 was
   varied  between 50 to 1200. Irrespective of the block size (e.g. at
 1200
   blocks each), there are atleast a couple of tasks that end up shuffle
   reading  9.7G each in the aggregate stage (ALS.scala:337) and
 failing
   with
   the following exception:
  
   java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
   at
   org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
   at
   org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
   at
  
  
 org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
   at
  
  
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
  
 
 





Re: ALS failure with size Integer.MAX_VALUE

2014-12-03 Thread Bharath Ravi Kumar
Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And
yes, I've been following the JIRA for the new ALS implementation. I'll try
it out when it's ready for testing. .

On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 You can try setting a small item blocks in this case. 1200 is
 definitely too large for ALS. Please try 30 or even smaller. I'm not
 sure whether this could solve the problem because you have 100 items
 connected with 10^8 users. There is a JIRA for this issue:

 https://issues.apache.org/jira/browse/SPARK-3735

 which I will try to implement in 1.3. I'll ping you when it is ready.

 Best,
 Xiangrui

 On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Yes, the issue appears to be due to the 2GB block size limitation. I am
  hence looking for (user, product) block sizing suggestions to work around
  the block size limitation.
 
  On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote:
 
  (It won't be that, since you see that the error occur when reading a
  block from disk. I think this is an instance of the 2GB block size
  limitation.)
 
  On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
  ilya.gane...@capitalone.com wrote:
   Hi Bharath – I’m unsure if this is your problem but the
   MatrixFactorizationModel in MLLIB which is the underlying component
 for
   ALS
   expects your User/Product fields to be integers. Specifically, the
 input
   to
   ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
   wondering if
   perhaps one of your identifiers exceeds MAX_INT, could you write a
 quick
   check for that?
  
   I have been running a very similar use case to yours (with more
   constrained
   hardware resources) and I haven’t seen this exact problem but I’m sure
   we’ve
   seen similar issues. Please let me know if you have other questions.
  
   From: Bharath Ravi Kumar reachb...@gmail.com
   Date: Thursday, November 27, 2014 at 1:30 PM
   To: user@spark.apache.org user@spark.apache.org
   Subject: ALS failure with size  Integer.MAX_VALUE
  
   We're training a recommender with ALS in mllib 1.1 against a dataset
 of
   150M
   users and 4.5K items, with the total number of training records being
   1.2
   Billion (~30GB data). The input data is spread across 1200 partitions
 on
   HDFS. For the training, rank=10, and we've configured {number of user
   data
   blocks = number of item data blocks}. The number of user/item blocks
 was
   varied  between 50 to 1200. Irrespective of the block size (e.g. at
 1200
   blocks each), there are atleast a couple of tasks that end up shuffle
   reading  9.7G each in the aggregate stage (ALS.scala:337) and failing
   with
   the following exception:
  
   java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
   at
   org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
   at
   org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
   at
  
  
 org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
   at
  
  
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
  
 
 



Re: ALS failure with size Integer.MAX_VALUE

2014-12-01 Thread Bharath Ravi Kumar
Yes, the issue appears to be due to the 2GB block size limitation. I am
hence looking for (user, product) block sizing suggestions to work around
the block size limitation.

On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote:

 (It won't be that, since you see that the error occur when reading a
 block from disk. I think this is an instance of the 2GB block size
 limitation.)

 On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
 ilya.gane...@capitalone.com wrote:
  Hi Bharath – I’m unsure if this is your problem but the
  MatrixFactorizationModel in MLLIB which is the underlying component for
 ALS
  expects your User/Product fields to be integers. Specifically, the input
 to
  ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
 wondering if
  perhaps one of your identifiers exceeds MAX_INT, could you write a quick
  check for that?
 
  I have been running a very similar use case to yours (with more
 constrained
  hardware resources) and I haven’t seen this exact problem but I’m sure
 we’ve
  seen similar issues. Please let me know if you have other questions.
 
  From: Bharath Ravi Kumar reachb...@gmail.com
  Date: Thursday, November 27, 2014 at 1:30 PM
  To: user@spark.apache.org user@spark.apache.org
  Subject: ALS failure with size  Integer.MAX_VALUE
 
  We're training a recommender with ALS in mllib 1.1 against a dataset of
 150M
  users and 4.5K items, with the total number of training records being 1.2
  Billion (~30GB data). The input data is spread across 1200 partitions on
  HDFS. For the training, rank=10, and we've configured {number of user
 data
  blocks = number of item data blocks}. The number of user/item blocks was
  varied  between 50 to 1200. Irrespective of the block size (e.g. at 1200
  blocks each), there are atleast a couple of tasks that end up shuffle
  reading  9.7G each in the aggregate stage (ALS.scala:337) and failing
 with
  the following exception:
 
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
  at
 org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
  at
 
 org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
  at
 
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
 



Re: ALS failure with size Integer.MAX_VALUE

2014-11-28 Thread Bharath Ravi Kumar
Any suggestions to address the described problem? In particular, it appears
that considering the skewed degree of some of the item nodes in the graph,
I believe it should be possible to define better block sizes to reflect
that fact, but am unsure of the way of arriving at the sizes accordingly.

Thanks,
Bharath

On Fri, Nov 28, 2014 at 12:00 AM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 We're training a recommender with ALS in mllib 1.1 against a dataset of
 150M users and 4.5K items, with the total number of training records being
 1.2 Billion (~30GB data). The input data is spread across 1200 partitions
 on HDFS. For the training, rank=10, and we've configured {number of user
 data blocks = number of item data blocks}. The number of user/item blocks
 was varied  between 50 to 1200. Irrespective of the block size (e.g. at
 1200 blocks each), there are atleast a couple of tasks that end up shuffle
 reading  9.7G each in the aggregate stage (ALS.scala:337) and failing with
 the following exception:

 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
 at
 org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
 at
 org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
 at
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)




 As for the data, on the user side, the degree of a node in the
 connectivity graph is relatively small. However, on the item side, 3.8K out
 of the 4.5K items are connected to 10^5 users each on an average, with 100
 items being connected to nearly 10^8 users. The rest of the items are
 connected to less than 10^5 users. With such a skew in the connectivity
 graph, I'm unsure if additional memory or variation in the block sizes
 would help (considering my limited understanding of the implementation in
 mllib). Any suggestion to address the problem?


 The test is being run on a standalone cluster of 3 hosts, each with 100G
 RAM  24 cores dedicated to the application. The additional configs I made
 specific to the shuffle and task failure reduction are as follows:

 spark.core.connection.ack.wait.timeout=600
 spark.shuffle.consolidateFiles=true
 spark.shuffle.manager=SORT


 The job execution summary is as follows:

 Active Stages:

 Stage id 2,  aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200
 (3 failed), Shuffle Read :  141.6 GB

 Completed Stages (5)
 Stage IdDescriptionDuration
 Tasks: Succeeded/TotalInputShuffle ReadShuffle Write
 6org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min
  1200/120029.9 GB1668.4 MB186.8 GB

 5mapPartitionsWithIndex at ALS.scala:250 +details

 3map at ALS.scala:231

 0aggregate at ALS.scala:337 +details

 1map at ALS.scala:228 +details


 Thanks,
 Bharath



ALS failure with size Integer.MAX_VALUE

2014-11-27 Thread Bharath Ravi Kumar
We're training a recommender with ALS in mllib 1.1 against a dataset of
150M users and 4.5K items, with the total number of training records being
1.2 Billion (~30GB data). The input data is spread across 1200 partitions
on HDFS. For the training, rank=10, and we've configured {number of user
data blocks = number of item data blocks}. The number of user/item blocks
was varied  between 50 to 1200. Irrespective of the block size (e.g. at
1200 blocks each), there are atleast a couple of tasks that end up shuffle
reading  9.7G each in the aggregate stage (ALS.scala:337) and failing with
the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
at
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)




As for the data, on the user side, the degree of a node in the connectivity
graph is relatively small. However, on the item side, 3.8K out of the 4.5K
items are connected to 10^5 users each on an average, with 100 items being
connected to nearly 10^8 users. The rest of the items are connected to less
than 10^5 users. With such a skew in the connectivity graph, I'm unsure if
additional memory or variation in the block sizes would help (considering
my limited understanding of the implementation in mllib). Any suggestion to
address the problem?


The test is being run on a standalone cluster of 3 hosts, each with 100G
RAM  24 cores dedicated to the application. The additional configs I made
specific to the shuffle and task failure reduction are as follows:

spark.core.connection.ack.wait.timeout=600
spark.shuffle.consolidateFiles=true
spark.shuffle.manager=SORT


The job execution summary is as follows:

Active Stages:

Stage id 2,  aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200
(3 failed), Shuffle Read :  141.6 GB

Completed Stages (5)
Stage IdDescriptionDuration
Tasks: Succeeded/TotalInputShuffle ReadShuffle Write
6org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min
1200/120029.9 GB1668.4 MB186.8 GB

5mapPartitionsWithIndex at ALS.scala:250 +details

3map at ALS.scala:231

0aggregate at ALS.scala:337 +details

1map at ALS.scala:228 +details


Thanks,
Bharath


Re: OOM with groupBy + saveAsTextFile

2014-11-03 Thread Bharath Ravi Kumar
The result was no different with saveAsHadoopFile. In both cases, I can see
that I've misinterpreted the API docs. I'll explore the API's a bit further
for ways to save the iterable as chunks rather than one large text/binary.
It might also help to clarify this aspect in the API docs. For those (like
me) whose first practical experience with data processing is through spark,
having skipped the Hadoop MR ecosystem, it might help to clarify
interactions with HDFS and the likes. Thanks for all the help.

On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote:

 saveAsText means save every element of the RDD as one line of text.
 It works like TextOutputFormat in Hadoop MapReduce since that's what
 it uses. So you are causing it to create one big string out of each
 Iterable this way.

 On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Thanks for responding. This is what I initially suspected, and hence
 asked
  why the library needed to construct the entire value buffer on a single
 host
  before writing it out. The stacktrace appeared to suggest that user code
 is
  not constructing the large buffer. I'm simply calling groupBy and
 saveAsText
  on the resulting grouped rdd. The value after grouping is an
  IterableTuple4String, Double, String, String. None of the strings are
  large. I also do not need a single large string created out of the
 Iterable
  for writing to disk. Instead, I expect the iterable to get written out in
  chunks in response to saveAsText. This shouldn't be the default
 behaviour of
  saveAsText perhaps? Hence my original question of the behavior of
  saveAsText. The tuning / partitioning attempts were aimed at reducing
 memory
  pressure so that multiple such buffers aren't constructed at the same
 time
  on a host. I'll take a second look at the data and code before updating
 this
  thread. Thanks.
 



Re: OOM with groupBy + saveAsTextFile

2014-11-03 Thread Bharath Ravi Kumar
I also realized from your description of saveAsText that the API is indeed
behaving as expected i.e. it is appropriate (though not optimal) for the
API to construct a single string out of the value. If the value turns out
to be large, the user of the API needs to reconsider the implementation
approach. My bad.

On Mon, Nov 3, 2014 at 3:38 PM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 The result was no different with saveAsHadoopFile. In both cases, I can
 see that I've misinterpreted the API docs. I'll explore the API's a bit
 further for ways to save the iterable as chunks rather than one large
 text/binary. It might also help to clarify this aspect in the API docs. For
 those (like me) whose first practical experience with data processing is
 through spark, having skipped the Hadoop MR ecosystem, it might help to
 clarify interactions with HDFS and the likes. Thanks for all the help.

 On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote:

 saveAsText means save every element of the RDD as one line of text.
 It works like TextOutputFormat in Hadoop MapReduce since that's what
 it uses. So you are causing it to create one big string out of each
 Iterable this way.

 On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Thanks for responding. This is what I initially suspected, and hence
 asked
  why the library needed to construct the entire value buffer on a single
 host
  before writing it out. The stacktrace appeared to suggest that user
 code is
  not constructing the large buffer. I'm simply calling groupBy and
 saveAsText
  on the resulting grouped rdd. The value after grouping is an
  IterableTuple4String, Double, String, String. None of the strings
 are
  large. I also do not need a single large string created out of the
 Iterable
  for writing to disk. Instead, I expect the iterable to get written out
 in
  chunks in response to saveAsText. This shouldn't be the default
 behaviour of
  saveAsText perhaps? Hence my original question of the behavior of
  saveAsText. The tuning / partitioning attempts were aimed at reducing
 memory
  pressure so that multiple such buffers aren't constructed at the same
 time
  on a host. I'll take a second look at the data and code before updating
 this
  thread. Thanks.
 





Re: OOM with groupBy + saveAsTextFile

2014-11-02 Thread Bharath Ravi Kumar
Thanks for responding. This is what I initially suspected, and hence asked
why the library needed to construct the entire value buffer on a single
host before writing it out. The stacktrace appeared to suggest that user
code is not constructing the large buffer. I'm simply calling groupBy and
saveAsText on the resulting grouped rdd. The value after grouping is an
IterableTuple4String, Double, String, String. None of the strings are
large. I also do not need a single large string created out of the Iterable
for writing to disk. Instead, I expect the iterable to get written out in
chunks in response to saveAsText. This shouldn't be the default behaviour
of saveAsText perhaps? Hence my original question of the behavior of
saveAsText. The tuning / partitioning attempts were aimed at reducing
memory pressure so that multiple such buffers aren't constructed at the
same time on a host. I'll take a second look at the data and code before
updating this thread. Thanks.
None of your tuning will help here because the problem is actually the way
you are saving the output. If you take a look at the stacktrace, it is
trying to build a single string that is too large for the VM to allocate
memory. The VM is actually not running out of memory, but rather, JVM
cannot support a single String so large.

I suspect this is due to the fact that the value in your key, value pair
after group by is too long (maybe it concatenates every single record). Do
you really want to save the key, value output this way using a text file?
Maybe you can write them out as multiple strings rather than a single super
giant string.




On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:


 Hi,

 FYI as follows.  Could you post your heap size settings as well your Spark
 app code?

 Regards
 Arthur

 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail
 message Requested array size exceeds VM limit indicates that the
 application (or APIs used by that application) attempted to allocate an
 array that is larger than the heap size. For example, if an application
 attempts to allocate an array of 512MB but the maximum heap size is 256MB
 then OutOfMemoryError will be thrown with the reason Requested array size
 exceeds VM limit. In most cases the problem is either a configuration
 issue (heap size too small), or a bug that results in an application
 attempting to create a huge array, for example, when the number of elements
 in the array are computed using an algorithm that computes an incorrect
 size.”




 On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
 sort use case in a framework that is leading in sorting bench marks? Or is
 there something fundamentally wrong in the usage?
 On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs

OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Hi,

I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of
count ~ 100 million. The data size is 20GB and groupBy results in an RDD of
1061 keys with values being IterableTuple4String, Integer, Double,
String. The job runs on 3 hosts in a standalone setup with each host's
executor having 100G RAM and 24 cores dedicated to it. While the groupBy
stage completes successfully with ~24GB of shuffle write, the
saveAsTextFile fails after repeated retries with each attempt failing due
to an out of memory error *[1]*. I understand that a few partitions may be
overloaded as a result of the groupBy and I've tried the following config
combinations unsuccessfully:

1) Repartition the initial rdd (44 input partitions but 1061 keys) across
1061 paritions and have max cores = 3 so that each key is a logical
partition (though many partitions will end up on very few hosts), and each
host likely runs saveAsTextFile on a single key at a time due to max cores
= 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

2) Leave max cores unspecified, set the level of parallelism to 72, and
leave number of partitions unspecified (in which case the # input
partitions was used, which is 44)
Since I do not intend to cache RDD's, I have set
spark.storage.memoryFraction=0.2 in both cases.

My understanding is that if each host is processing a single logical
partition to saveAsTextFile and is reading from other hosts to write out
the RDD, it is unlikely that it would run out of memory. My interpretation
of the spark tuning guide is that the degree of parallelism has little
impact in case (1) above since max cores = number of hosts. Can someone
explain why there are still OOM's with 100G being available? On a related
note, intuitively (though I haven't read the source), it appears that an
entire key-value pair needn't fit into memory of a single host for
saveAsTextFile since a single shuffle read from a remote can be written to
HDFS before the next remote read is carried out. This way, not all data
needs to be collected at the same time.

Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
tuning guide and even as per Datastax's spark introduction), there may need
to be more documentation around the internals of spark to help users take
better informed tuning decisions with parallelism, max cores, number
partitions and other tunables. Is there any ongoing effort on that front?

Thanks,
Bharath


*[1]* OOM stack trace and logs
14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
java.util.Arrays.copyOf(Arrays.java:3326)

java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
java.lang.StringBuilder.append(StringBuilder.java:136)

scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
scala.Tuple2.toString(Tuple2.scala:22)

org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)

org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
43704, 0), shuffleId=0, mapId=13, reduceId=92)
14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
(groupBy at ModelTrainer.java:133)
14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
ModelTrainer.java:141) failed in 55.259 s
14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
ModelTrainer.java:141) due to fetch failure


Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Minor clarification: I'm running spark 1.1.0 on JDK 1.8, Linux 64 bit.

On Sun, Nov 2, 2014 at 1:06 AM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs to be collected at the same time.

 Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
 tuning guide and even as per Datastax's spark introduction), there may need
 to be more documentation around the internals of spark to help users take
 better informed tuning decisions with parallelism, max cores, number
 partitions and other tunables. Is there any ongoing effort on that front?

 Thanks,
 Bharath


 *[1]* OOM stack trace and logs
 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit
 java.util.Arrays.copyOf(Arrays.java:3326)

 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 java.lang.StringBuilder.append(StringBuilder.java:136)

 scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
 scala.Tuple2.toString(Tuple2.scala:22)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
 43704, 0), shuffleId=0, mapId=13, reduceId=92)
 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
 (groupBy at ModelTrainer.java:133)
 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) failed in 55.259 s
 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
 ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) due to fetch failure







Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
sort use case in a framework that is leading in sorting bench marks? Or is
there something fundamentally wrong in the usage?
On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs to be collected at the same time.

 Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
 tuning guide and even as per Datastax's spark introduction), there may need
 to be more documentation around the internals of spark to help users take
 better informed tuning decisions with parallelism, max cores, number
 partitions and other tunables. Is there any ongoing effort on that front?

 Thanks,
 Bharath


 *[1]* OOM stack trace and logs
 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit
 java.util.Arrays.copyOf(Arrays.java:3326)

 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 java.lang.StringBuilder.append(StringBuilder.java:136)

 scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
 scala.Tuple2.toString(Tuple2.scala:22)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
 43704, 0), shuffleId=0, mapId=13, reduceId=92)
 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
 (groupBy at ModelTrainer.java:133)
 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) failed in 55.259 s
 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
 ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) due to fetch failure







Re: OOM writing out sorted RDD

2014-08-10 Thread Bharath Ravi Kumar
Update: as expected, switching to kryo merely delays the inevitable. Does
anyone have experience controlling memory consumption while processing
(e.g. writing out) imbalanced partitions?
On 09-Aug-2014 10:41 am, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Our prototype application reads a 20GB dataset from HDFS (nearly 180
 partitions), groups it by key, sorts by rank and write out to HDFS in that
 order. The job runs against two nodes (16G, 24 cores per node available to
 the job). I noticed that the execution plan results in two sortByKey
 stages, followed by groupByKey and a saveAsTextFile. The shuffle write for
 groupByKey is ~20G in size. During the saveAsTextFile stage, however, after
 writing out 50% of the partitions, the memory on one of the executors
 shoots up to 16G and the executor spends all its time in GC's. Eventually,
 the logs show an OOM [1] included at the end of the mail followed by
 another TID loss to FileSystem closed errors indicated in stacktrace [2].
 I noticed that the partitions may be skewed as a result of the sort, with
 one or two paritions having upto 10% of all rows. I also noticed that the
 data written out until the 50% stage (when memory shoots up) had a large
 number of empty part- files followed by a few files of 200M in size. It
 could hence be that the attempt to write out one large partition is causing
 the OOM. The tuning documentation states that a larger level of parallelism
 might help mitigate the problem, but setting default parallelism to 2x the
 number of cores didn't help either. While I could attempt to partition
 manually (choosing custom ranges for a range partitioner), it appears that
 limiting read sizes (from the earlier shuffle) during the write to HDFS
 should help successfully write out even overloaded partitions as well. Are
 there parameters that might help achieve that?
 (On a related note, any idea if using Kryo serialization would help in
 this case?)

 Thanks,
 Bharath

 [1]
 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137)
 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
 java.lang.OutOfMemoryError
 java.lang.OutOfMemoryError: Java heap space
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 at
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 at
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
 org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at org.apache.spark.InterruptibleIterator.to
 (InterruptibleIterator.scala:28)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at
 org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at
 org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
 at
 org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)



 [2]
 14/08/09 04:59:26

OOM writing out sorted RDD

2014-08-08 Thread Bharath Ravi Kumar
Our prototype application reads a 20GB dataset from HDFS (nearly 180
partitions), groups it by key, sorts by rank and write out to HDFS in that
order. The job runs against two nodes (16G, 24 cores per node available to
the job). I noticed that the execution plan results in two sortByKey
stages, followed by groupByKey and a saveAsTextFile. The shuffle write for
groupByKey is ~20G in size. During the saveAsTextFile stage, however, after
writing out 50% of the partitions, the memory on one of the executors
shoots up to 16G and the executor spends all its time in GC's. Eventually,
the logs show an OOM [1] included at the end of the mail followed by
another TID loss to FileSystem closed errors indicated in stacktrace [2].
I noticed that the partitions may be skewed as a result of the sort, with
one or two paritions having upto 10% of all rows. I also noticed that the
data written out until the 50% stage (when memory shoots up) had a large
number of empty part- files followed by a few files of 200M in size. It
could hence be that the attempt to write out one large partition is causing
the OOM. The tuning documentation states that a larger level of parallelism
might help mitigate the problem, but setting default parallelism to 2x the
number of cores didn't help either. While I could attempt to partition
manually (choosing custom ranges for a range partitioner), it appears that
limiting read sizes (from the earlier shuffle) during the write to HDFS
should help successfully write out even overloaded partitions as well. Are
there parameters that might help achieve that?
(On a related note, any idea if using Kryo serialization would help in this
case?)

Thanks,
Bharath

[1]
14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137)
14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
at org.apache.spark.InterruptibleIterator.to
(InterruptibleIterator.scala:28)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at
org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at
org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at
org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)



[2]
14/08/09 04:59:26 WARN TaskSetManager: Lost TID 774 (task 33.0:198)
14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/tmp/spark-local-20140809033328-e374/31/shuffle_0_5_198 (No such file or
directory)
at java.io.RandomAccessFile.open(Native Method)
at 

Implementing percentile through top Vs take

2014-07-30 Thread Bharath Ravi Kumar
I'm looking to select the top n records (by rank) from a data set of a few
hundred GB's. My understanding is that JavaRDD.top(n, comparator) is
entirely a driver-side operation in that all records are sorted in the
driver's memory. I prefer an approach where the records are sorted on the
cluster and only the top ones sent to the driver. I'm hence leaning towards
creating a  JavaPairRDD on a key, then sorting the rdd by key and invoking
take(N). I'd like to know if rdd.top achieves the same result (while being
executed on the cluster) as take or if my assumption that it's a driver
side operation is correct.

Thanks,
Bharath


Re: Hadoop client protocol mismatch with spark 1.0.1, cdh3u5

2014-07-25 Thread Bharath Ravi Kumar
Any suggestions to  work around this issue ? The pre built spark binaries
don't appear to work against cdh as documented, unless there's a build
issue, which seems unlikely.
On 25-Jul-2014 3:42 pm, Bharath Ravi Kumar reachb...@gmail.com wrote:


 I'm encountering a hadoop client protocol mismatch trying to read from
 HDFS (cdh3u5) using the pre-build spark from the downloads page (linked
 under For Hadoop 1 (HDP1, CDH3)). I've also  followed the instructions at
 http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html
 (i.e. building the app against hadoop-client 0.20.2-cdh3u5), but continue
 to see the following error regardless of whether I link the app with the
 cdh client:

 14/07/25 09:53:43 INFO client.AppClient$ClientActor: Executor updated:
 app-20140725095343-0016/1 is now RUNNING
 14/07/25 09:53:43 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/07/25 09:53:43 WARN snappy.LoadSnappy: Snappy native library not loaded
 Exception in thread main org.apache.hadoop.ipc.RPC$VersionMismatch:
 Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch.
 (client = 61, server = 63)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)


 While I can build spark against the exact hadoop distro version, I'd
 rather work with the standard prebuilt binaries, making additional changes
 while building the app if necessary. Any workarounds/recommendations?

 Thanks,
 Bharath



Re: Hadoop client protocol mismatch with spark 1.0.1, cdh3u5

2014-07-25 Thread Bharath Ravi Kumar
Thanks for responding. I used the pre built spark binaries meant for
hadoop1,cdh3u5. I do not intend to build spark against a specific
distribution. Irrespective of whether I build my app with the explicit cdh
hadoop client dependency,  I get the same error message. I also verified
that my  app's uber jar had pulled in the cdh hadoop client dependencies.
On 25-Jul-2014 9:26 pm, Sean Owen so...@cloudera.com wrote:

 This indicates your app is not actually using the version of the HDFS
 client you think. You built Spark from source with the right deps it
 seems, but are you sure you linked to your build in your app?

 On Fri, Jul 25, 2014 at 4:32 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Any suggestions to  work around this issue ? The pre built spark binaries
  don't appear to work against cdh as documented, unless there's a build
  issue, which seems unlikely.
 
  On 25-Jul-2014 3:42 pm, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
 
 
  I'm encountering a hadoop client protocol mismatch trying to read from
  HDFS (cdh3u5) using the pre-build spark from the downloads page (linked
  under For Hadoop 1 (HDP1, CDH3)). I've also  followed the
 instructions at
 
 http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html
  (i.e. building the app against hadoop-client 0.20.2-cdh3u5), but
 continue to
  see the following error regardless of whether I link the app with the
 cdh
  client:
 
  14/07/25 09:53:43 INFO client.AppClient$ClientActor: Executor updated:
  app-20140725095343-0016/1 is now RUNNING
  14/07/25 09:53:43 WARN util.NativeCodeLoader: Unable to load
 native-hadoop
  library for your platform... using builtin-java classes where applicable
  14/07/25 09:53:43 WARN snappy.LoadSnappy: Snappy native library not
 loaded
  Exception in thread main org.apache.hadoop.ipc.RPC$VersionMismatch:
  Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version
 mismatch.
  (client = 61, server = 63)
  at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401)
  at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 
 
  While I can build spark against the exact hadoop distro version, I'd
  rather work with the standard prebuilt binaries, making additional
 changes
  while building the app if necessary. Any workarounds/recommendations?
 
  Thanks,
  Bharath



Re: Hadoop client protocol mismatch with spark 1.0.1, cdh3u5

2014-07-25 Thread Bharath Ravi Kumar
That's right, I'm looking to depend on spark in general and change only the
hadoop client deps. The spark master and slaves use the
spark-1.0.1-bin-hadoop1 binaries from the downloads page.  The relevant
snippet from the app's maven pom is as follows:

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.0.1/version
scopeprovided/scope
/dependency
dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-client/artifactId
  version0.20.2-cdh3u5/version
  typejar/type
/dependency
/dependencies

repositories
repository
idCloudera repository/id
urlhttps://repository.cloudera.com/artifactory/cloudera-repos/
/url
/repository
repository
idAkka repository/id
urlhttp://repo.akka.io/releases/url
/repository
/repositories


Thanks,
Bharath


On Fri, Jul 25, 2014 at 10:29 PM, Sean Owen so...@cloudera.com wrote:

 If you link against the pre-built binary, that's for Hadoop 1.0.4. Can
 you show your deps to clarify what you are depending on? Building
 custom Spark and depending on it is a different thing from depending
 on plain Spark and changing its deps. I think you want the latter.

 On Fri, Jul 25, 2014 at 5:46 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Thanks for responding. I used the pre built spark binaries meant for
  hadoop1,cdh3u5. I do not intend to build spark against a specific
  distribution. Irrespective of whether I build my app with the explicit
 cdh
  hadoop client dependency,  I get the same error message. I also verified
  that my  app's uber jar had pulled in the cdh hadoop client dependencies.
 
  On 25-Jul-2014 9:26 pm, Sean Owen so...@cloudera.com wrote:
 
  This indicates your app is not actually using the version of the HDFS
  client you think. You built Spark from source with the right deps it
  seems, but are you sure you linked to your build in your app?
 
  On Fri, Jul 25, 2014 at 4:32 PM, Bharath Ravi Kumar 
 reachb...@gmail.com
  wrote:
   Any suggestions to  work around this issue ? The pre built spark
   binaries
   don't appear to work against cdh as documented, unless there's a build
   issue, which seems unlikely.
  
   On 25-Jul-2014 3:42 pm, Bharath Ravi Kumar reachb...@gmail.com
   wrote:
  
  
   I'm encountering a hadoop client protocol mismatch trying to read
 from
   HDFS (cdh3u5) using the pre-build spark from the downloads page
 (linked
   under For Hadoop 1 (HDP1, CDH3)). I've also  followed the
   instructions at
  
  
 http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html
   (i.e. building the app against hadoop-client 0.20.2-cdh3u5), but
   continue to
   see the following error regardless of whether I link the app with the
   cdh
   client:
  
   14/07/25 09:53:43 INFO client.AppClient$ClientActor: Executor
 updated:
   app-20140725095343-0016/1 is now RUNNING
   14/07/25 09:53:43 WARN util.NativeCodeLoader: Unable to load
   native-hadoop
   library for your platform... using builtin-java classes where
   applicable
   14/07/25 09:53:43 WARN snappy.LoadSnappy: Snappy native library not
   loaded
   Exception in thread main org.apache.hadoop.ipc.RPC$VersionMismatch:
   Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version
   mismatch.
   (client = 61, server = 63)
   at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401)
   at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
  
  
   While I can build spark against the exact hadoop distro version, I'd
   rather work with the standard prebuilt binaries, making additional
   changes
   while building the app if necessary. Any workarounds/recommendations?
  
   Thanks,
   Bharath



Re: Execution stalls in LogisticRegressionWithSGD

2014-07-02 Thread Bharath Ravi Kumar
Hi Xiangrui,

The issue with aggergating/counting over large feature vectors (as part of
LogisticRegressionWithSGD) continues to exist, but now in another form:
while the execution doesn't freeze (due to SPARK-1112), it now fails at the
second or third gradient descent iteration consistently with an error level
log message, but no stacktrace. I'm running against 1.0.1-rc1, and have
tried setting spark.akka.frameSize as high as 500. When the execution
fails, each of the two executors log the following message (corresponding
to aggregate at GradientDescent.scala:178) :

14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 2 non-empty blocks out of 2 blocks
14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 1 remote fetches in 0 ms
14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is
5959086
14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to driver
14/07/02 14:09:11 INFO Executor: Finished task ID 737
14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@(slave1,slave2):51941] -
[akka.tcp://spark@master:59487]
disassociated! Shutting down.


There is no separate stacktrace on the driver side.

Each input record is of the form p1, p2, (p1,p2) where p1, p2  (p1,p2) are
categorical features with large cardinality, and X is the double label with
a continuous value. The categorical variables are converted to binary
variables which results in a feature vector of size 741092 (composed of all
unique categories across p1, p2 and (p1,p2)). Thus, the labeled point for
input record is a sparse vector of size 741092 with only 3 variables set in
the record. The total number of records is 683233 after aggregating the
input data on (p1, p2). When attempting to train on the unaggregated
records (1337907 in number spread across 455 files), the execution fails at
count, GradientDescent.scala:161 with the following log


(Snipped lines corresponding to other input files)
14/07/02 16:02:03 INFO HadoopRDD: Input split:
file:~/part-r-00012:2834590+2834590
14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-5:0+2845559
14/07/02 16:02:03 INFO HadoopRDD: Input split:
file:~/part-r-5:2845559+2845560
14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is 615
14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to driver
14/07/02 16:02:03 INFO Executor: Finished task ID 726
14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@slave1:48423] - [akka.tcp://spark@master:55792]
disassociated! Shutting down.

A count() attempted on the input RDD before beginning training has the
following metrics:

MetricMin25thMedian75th Max

Result
serialization
time0 ms0 ms0 ms0 ms0 ms

Duration33 s33 s35 s35 s35 s

Time spent
fetching task
results0 ms0 ms0 ms0 ms0 ms

Scheduler
delay0.1 s0.1 s0.3 s0.3 s0.3 s

Aggregated Metrics by Executor

ID Address Task Time Total Failed Succeeded Shuffle Read
Shuffle Write Shuf Spill (Mem) Shuf Spill (Disk)
0 CANNOT FIND ADDRESS 34 s 1 0 1 0.0 B
0.0 B 0.0 B 0.0 B
1 CANNOT FIND ADDRESS 36 s 1 0 1 0.0 B
0.0 B 0.0 B 0.0 B

Tasks

Task IndexTask IDStatusLocality LevelExecutorLaunch
TimeDurationGC TimeResult Ser TimeErrors
0 726 SUCCESS PROCESS_LOCAL slave1 2014/07/02
16:01:28 35 s 0.1 s
1 727 SUCCESS PROCESS_LOCAL slave2 2014/07/02
16:01:28 33 s 99 ms

Any pointers / diagnosis please?



On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 Thanks. I'll await the fix to re-run my test.


 On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 This is related to SPARK-1112, which we already found the root cause.
 I will let you know when this is fixed.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Couple more points:
  1)The inexplicable stalling of execution with large feature sets appears
  similar to that reported with the news-20 dataset:
 
 http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E
 
  2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer,
  Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer
 is
  unrelated to mllib.
 
  Thanks,
  Bharath
 
 
 
  On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar 
 reachb...@gmail.com
  wrote:
 
  Hi  Xiangrui ,
 
  I'm using

Re: Execution stalls in LogisticRegressionWithSGD

2014-06-18 Thread Bharath Ravi Kumar
Thanks. I'll await the fix to re-run my test.


On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 This is related to SPARK-1112, which we already found the root cause.
 I will let you know when this is fixed.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Couple more points:
  1)The inexplicable stalling of execution with large feature sets appears
  similar to that reported with the news-20 dataset:
 
 http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E
 
  2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer,
  Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer
 is
  unrelated to mllib.
 
  Thanks,
  Bharath
 
 
 
  On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com
 
  wrote:
 
  Hi  Xiangrui ,
 
  I'm using 1.0.0.
 
  Thanks,
  Bharath
 
  On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote:
 
  Hi Bharath,
 
  Thanks for posting the details! Which Spark version are you using?
 
  Best,
  Xiangrui
 
  On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar 
 reachb...@gmail.com
  wrote:
   Hi,
  
   (Apologies for the long mail, but it's necessary to provide
 sufficient
   details considering the number of issues faced.)
  
   I'm running into issues testing LogisticRegressionWithSGD a two node
   cluster
   (each node with 24 cores and 16G available to slaves out of 24G on
 the
   system). Here's a description of the application:
  
   The model is being trained based on categorical features x, y, and
   (x,y).
   The categorical features are mapped to binary features by converting
   each
   distinct value in the category enum into a binary feature by itself
   (i.e
   presence of that value in a record implies corresponding feature = 1,
   else
   feature = 0. So, there'd be as many distinct features as enum
 values) .
   The
   training vector is laid out as
   [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in
 the
   training data has only one combination (Xk,Yk) and a label appearing
 in
   the
   record. Thus, the corresponding labeledpoint sparse vector would only
   have 3
   values Xk, Yk, (Xk,Yk) set for a record. The total length of the
 vector
   (though parse) would be nearly 614000.  The number of records is
 about
   1.33
   million. The records have been coalesced into 20 partitions across
 two
   nodes. The input data has not been cached.
   (NOTE: I do realize the records  features may seem large for a two
   node
   setup, but given the memory  cpu, and the fact that I'm willing to
   give up
   some turnaround time, I don't see why tasks should inexplicably fail)
  
   Additional parameters include:
  
   spark.executor.memory = 14G
   spark.default.parallelism = 1
   spark.cores.max=20
   spark.storage.memoryFraction=0.8 //No cache space required
   (Trying to set spark.akka.frameSize to a larger number, say, 20
 didn't
   help
   either)
  
   The model training was initialized as : new
   LogisticRegressionWithSGD(1,
   maxIterations, 0.0, 0.05)
  
   However, after 4 iterations of gradient descent, the entire execution
   appeared to stall inexplicably. The corresponding executor details
 and
   details of the stalled stage (number 14) are as follows:
  
   MetricMin25th Median75th
   Max
   Result serialization time12 ms13 ms14 ms16 ms18
 ms
   Duration4 s4 s5 s5 s
   5 s
   Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
   results
   Scheduler delay6 s6 s6 s6 s
   12 s
  
  
   Stage Id
   14 aggregate at GradientDescent.scala:178
  
   Task IndexTask IDStatusLocality Level Executor
   Launch TimeDurationGC Result Ser Time
  Errors
  
   Time
  
   0 600 RUNNING PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   1 601 RUNNING PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   2 602 RUNNING PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   3 603 RUNNING PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   4 604 RUNNING PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   5 605 SUCCESS PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 4 s 2 s 12 ms
   6 606 SUCCESS PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 4 s 1 s 14 ms
   7 607 SUCCESS PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 4 s 2 s 12 ms
   8 608 SUCCESS PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 5 s 1 s 15 ms
   9 609 SUCCESS

Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Bharath Ravi Kumar
Hi,

(Apologies for the long mail, but it's necessary to provide sufficient
details considering the number of issues faced.)

I'm running into issues testing LogisticRegressionWithSGD a two node
cluster (each node with 24 cores and 16G available to slaves out of 24G on
the system). Here's a description of the application:

The model is being trained based on categorical features x, y, and (x,y).
The categorical features are mapped to binary features by converting each
distinct value in the category enum into a binary feature by itself (i.e
presence of that value in a record implies corresponding feature = 1, else
feature = 0. So, there'd be as many distinct features as enum values) . The
training vector is laid out as
[x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
training data has only one combination (Xk,Yk) and a label appearing in the
record. Thus, the corresponding labeledpoint sparse vector would only have
3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
(though parse) would be nearly 614000.  The number of records is about 1.33
million. The records have been coalesced into 20 partitions across two
nodes. The input data has not been cached.
(NOTE: I do realize the records  features may seem large for a two node
setup, but given the memory  cpu, and the fact that I'm willing to give up
some turnaround time, I don't see why tasks should inexplicably fail)

Additional parameters include:

spark.executor.memory = 14G
spark.default.parallelism = 1
spark.cores.max=20
spark.storage.memoryFraction=0.8 //No cache space required
(Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help
either)

The model training was initialized as : new LogisticRegressionWithSGD(1,
maxIterations, 0.0, 0.05)

However, after 4 iterations of gradient descent, the entire execution
appeared to stall inexplicably. The corresponding executor details and
details of the stalled stage (number 14) are as follows:

MetricMin25th Median75th Max
Result serialization time12 ms13 ms14 ms16 ms18 ms
Duration4 s4 s5 s5 s5 s
Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
results
Scheduler delay6 s6 s6 s6 s
12 s


Stage Id
14 aggregate at GradientDescent.scala:178

Task IndexTask IDStatusLocality Level Executor
Launch TimeDurationGC Result Ser
TimeErrors

Time

0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 2 s 12 ms
6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 14 ms
7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 2 s 12 ms
8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 14 ms
10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 13 ms
12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 18 ms
13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 13 ms
14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 14 ms
15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 12 ms
16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 18 ms
18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 16 ms
19 619 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 18 ms

Executor stats:

RDD BlocksMemory UsedDisk UsedActive TasksFailed Tasks
Complete TasksTotal TasksTask Time

Re: Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Bharath Ravi Kumar
Hi  Xiangrui ,

I'm using 1.0.0.

Thanks,
Bharath
On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 Thanks for posting the details! Which Spark version are you using?

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Hi,
 
  (Apologies for the long mail, but it's necessary to provide sufficient
  details considering the number of issues faced.)
 
  I'm running into issues testing LogisticRegressionWithSGD a two node
 cluster
  (each node with 24 cores and 16G available to slaves out of 24G on the
  system). Here's a description of the application:
 
  The model is being trained based on categorical features x, y, and (x,y).
  The categorical features are mapped to binary features by converting each
  distinct value in the category enum into a binary feature by itself (i.e
  presence of that value in a record implies corresponding feature = 1,
 else
  feature = 0. So, there'd be as many distinct features as enum values) .
 The
  training vector is laid out as
  [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
  training data has only one combination (Xk,Yk) and a label appearing in
 the
  record. Thus, the corresponding labeledpoint sparse vector would only
 have 3
  values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
  (though parse) would be nearly 614000.  The number of records is about
 1.33
  million. The records have been coalesced into 20 partitions across two
  nodes. The input data has not been cached.
  (NOTE: I do realize the records  features may seem large for a two node
  setup, but given the memory  cpu, and the fact that I'm willing to give
 up
  some turnaround time, I don't see why tasks should inexplicably fail)
 
  Additional parameters include:
 
  spark.executor.memory = 14G
  spark.default.parallelism = 1
  spark.cores.max=20
  spark.storage.memoryFraction=0.8 //No cache space required
  (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
 help
  either)
 
  The model training was initialized as : new LogisticRegressionWithSGD(1,
  maxIterations, 0.0, 0.05)
 
  However, after 4 iterations of gradient descent, the entire execution
  appeared to stall inexplicably. The corresponding executor details and
  details of the stalled stage (number 14) are as follows:
 
  MetricMin25th Median75th Max
  Result serialization time12 ms13 ms14 ms16 ms18 ms
  Duration4 s4 s5 s5 s
  5 s
  Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
  results
  Scheduler delay6 s6 s6 s6 s
  12 s
 
 
  Stage Id
  14 aggregate at GradientDescent.scala:178
 
  Task IndexTask IDStatusLocality Level Executor
  Launch TimeDurationGC Result Ser TimeErrors
 
  Time
 
  0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 14 ms
  7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 14 ms
  10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 13 ms
  12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 18 ms
  13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 13 ms
  14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 14 ms
  15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 12 ms
  16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  17 617 SUCCESS PROCESS_LOCAL

Re: Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Bharath Ravi Kumar
Couple more points:
1)The inexplicable stalling of execution with large feature sets appears
similar to that reported with the news-20 dataset:
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E

2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer,
Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is
unrelated to mllib.

Thanks,
Bharath



On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 Hi  Xiangrui ,

 I'm using 1.0.0.

 Thanks,
 Bharath
 On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 Thanks for posting the details! Which Spark version are you using?

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Hi,
 
  (Apologies for the long mail, but it's necessary to provide sufficient
  details considering the number of issues faced.)
 
  I'm running into issues testing LogisticRegressionWithSGD a two node
 cluster
  (each node with 24 cores and 16G available to slaves out of 24G on the
  system). Here's a description of the application:
 
  The model is being trained based on categorical features x, y, and
 (x,y).
  The categorical features are mapped to binary features by converting
 each
  distinct value in the category enum into a binary feature by itself (i.e
  presence of that value in a record implies corresponding feature = 1,
 else
  feature = 0. So, there'd be as many distinct features as enum values) .
 The
  training vector is laid out as
  [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
  training data has only one combination (Xk,Yk) and a label appearing in
 the
  record. Thus, the corresponding labeledpoint sparse vector would only
 have 3
  values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
  (though parse) would be nearly 614000.  The number of records is about
 1.33
  million. The records have been coalesced into 20 partitions across two
  nodes. The input data has not been cached.
  (NOTE: I do realize the records  features may seem large for a two node
  setup, but given the memory  cpu, and the fact that I'm willing to
 give up
  some turnaround time, I don't see why tasks should inexplicably fail)
 
  Additional parameters include:
 
  spark.executor.memory = 14G
  spark.default.parallelism = 1
  spark.cores.max=20
  spark.storage.memoryFraction=0.8 //No cache space required
  (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
 help
  either)
 
  The model training was initialized as : new LogisticRegressionWithSGD(1,
  maxIterations, 0.0, 0.05)
 
  However, after 4 iterations of gradient descent, the entire execution
  appeared to stall inexplicably. The corresponding executor details and
  details of the stalled stage (number 14) are as follows:
 
  MetricMin25th Median75th Max
  Result serialization time12 ms13 ms14 ms16 ms18 ms
  Duration4 s4 s5 s5 s
  5 s
  Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
  results
  Scheduler delay6 s6 s6 s6 s
  12 s
 
 
  Stage Id
  14 aggregate at GradientDescent.scala:178
 
  Task IndexTask IDStatusLocality Level Executor
  Launch TimeDurationGC Result Ser TimeErrors
 
  Time
 
  0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 14 ms
  7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 14 ms
  10 610 SUCCESS PROCESS_LOCAL
 serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 13 ms
  12 612 SUCCESS PROCESS_LOCAL
 serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 18 ms
  13 613 SUCCESS PROCESS_LOCAL

Standalone client failing with docker deployed cluster

2014-05-16 Thread Bharath Ravi Kumar
Hi,

I'm running the spark server with a single worker on a laptop using the
docker images. The spark shell examples run fine with this setup. However,
a standalone java client that tries to run wordcount on a local files (1 MB
in size), the execution fails with the following error on the stdout of the
worker:

14/05/15 10:31:21 INFO Slf4jLogger: Slf4jLogger started
14/05/15 10:31:21 INFO Remoting: Starting remoting
14/05/15 10:31:22 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkExecutor@worker1:55924]
14/05/15 10:31:22 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@worker1:55924]
14/05/15 10:31:22 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@R9FX97h.local:56720/user/CoarseGrainedScheduler
14/05/15 10:31:22 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@worker1:50040/user/Worker
14/05/15 10:31:22 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://spark@R9FX97h.local:56720]. Address is now gated for
6 ms, all messages to this address will be delivered to dead letters.
14/05/15 10:31:22 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@worker1:55924] -
[akka.tcp://spark@R9FX97h.local:56720]
disassociated! Shutting down.

I noticed the following messages on the worker console when I attached
through docker:

14/05/15 11:24:33 INFO Worker: Asked to launch executor
app-20140515112408-0005/7 for billingLogProcessor
14/05/15 11:24:33 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] -
[akka.tcp://sparkExecutor@worker1:42437]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:42437]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:42437
]
14/05/15 11:24:33 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] -
[akka.tcp://sparkExecutor@worker1:42437]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:42437]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:42437
]
14/05/15 11:24:33 INFO ExecutorRunner: Launch command:
/usr/lib/jvm/java-7-openjdk-amd64/bin/java -cp
:/opt/spark-0.9.0/conf:/opt/spark-0.9.0/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar
-Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@R9FX97h.local:46986/user/CoarseGrainedScheduler 7
worker1 1 akka.tcp://sparkWorker@worker1:50040/user/Worker
app-20140515112408-0005
14/05/15 11:24:35 INFO Worker: Executor app-20140515112408-0005/7 finished
with state FAILED message Command exited with code 1 exitStatus 1
14/05/15 11:24:35 INFO LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40172.17.0.4%3A33648-135#310170905]
was not delivered. [34] dead letters encountered. This logging can be
turned off or adjusted with configuration settings 'akka.log-dead-letters'
and 'akka.log-dead-letters-during-shutdown'.
14/05/15 11:24:35 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] -
[akka.tcp://sparkExecutor@worker1:56594]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:56594]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:56594
]
14/05/15 11:24:35 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] -
[akka.tcp://sparkExecutor@worker1:56594]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:56594]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:56594
]

The significant code snippets from the standalone java client are as
follows:

JavaSparkContext ctx = new JavaSparkContext(masterAddr, log_processor,
sparkHome, jarFileLoc);
JavaRDDString rawLog = ctx.textFile(/tmp/some.log);
ListTuple2String, Long topRecords =
rawLog.map(fieldSplitter).map(fieldExtractor).top(5, tupleComparator);


However, running the sample code provided on github (amplab docker page)
over the spark shell went through fine with the following stdout message:

14/05/15 10:39:41 INFO Slf4jLogger: Slf4jLogger started
14/05/15 10:39:42 INFO Remoting: Starting remoting
14/05/15 10:39:42