Re: PCA OutOfMemoryError
Hello Alex, Thanks for the response. There isn't much other data on the driver, so the issue is probably inherent to this particular PCA implementation. I'll try the alternative approach that you suggested instead. Thanks again. -Bharath On Wed, Jan 13, 2016 at 11:24 PM, Alex Gittens <swift...@gmail.com> wrote: > The PCA.fit function calls the RowMatrix PCA routine, which attempts to > construct the covariance matrix locally on the driver, and then computes > the SVD of that to get the PCs. I'm not sure what's causing the memory > error: RowMatrix.scala:124 is only using 3.5 GB of memory (n*(n+1)/2 with > n=29604 and double precision), so unless you're filling up the memory with > other RDDs, you should have plenty of space on the driver. > > One alternative is to manually center your RDD (so make one pass over it > to compute the mean, then another to subtract it out and form a new RDD), > then directly call the computeSVD routine in RowMatrix to compute the SVD > of the gramian matrix of this RDD (e.g., the covariance matrix of the > original RDD) in a distributed manner, so the covariance matrix doesn't > need to be formed explicitly. You can look at the getLowRankFactorization > and convertLowRankFactorizationToEOFs routines at > > https://github.com/rustandruin/large-scale-climate/blob/master/src/main/scala/eofs.scala > for example of this approach (call the second on the results of the first > to get the SVD of the input matrix to the first; EOF is another name for > PCA). > > This takes about 30 minutes to compute the top 20 PCs of a 46.7K-by-6.3M > dense matrix of doubles (~2 Tb), with most of the time spent on the > distributed matrix-vector multiplies. > > Best, > Alex > > > On Tue, Jan 12, 2016 at 6:39 PM, Bharath Ravi Kumar <reachb...@gmail.com> > wrote: > >> Any suggestion/opinion? >> On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote: >> >>> We're running PCA (selecting 100 principal components) on a dataset that >>> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The >>> matrix in question is mostly sparse with tens of columns populate in most >>> rows, but a few rows with thousands of columns populated. We're running >>> spark on mesos with driver memory set to 40G and executor memory set to >>> 80G. We're however encountering an out of memory error (included at the end >>> of the message) regardless of the number of rdd partitions or the degree of >>> task parallelism being set. I noticed a warning at the beginning of the PCA >>> computation stage: " WARN >>> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will >>> require at least 7011 megabyte of memory!" >>> I don't understand which memory this refers to. Is this the executor >>> memory? The driver memory? Any other? >>> The stacktrace appears to indicate that a large array is probably being >>> passed along with the task. Could this array have been passed as a >>> broadcast variable instead ? Any suggestions / workarounds other than >>> re-implementing the algorithm? >>> >>> Thanks, >>> Bharath >>> >>> >>> >>> Exception in thread "main" java.lang.OutOfMemoryError: Requested array >>> size exceeds VM limit >>> at java.util.Arrays.copyOf(Arrays.java:2271) >>> at >>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) >>> at >>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>> at >>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>> at >>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) >>> at >>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) >>> at >>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) >>> at >>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) >>>
Re: PCA OutOfMemoryError
Any suggestion/opinion? On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote: > We're running PCA (selecting 100 principal components) on a dataset that > has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The > matrix in question is mostly sparse with tens of columns populate in most > rows, but a few rows with thousands of columns populated. We're running > spark on mesos with driver memory set to 40G and executor memory set to > 80G. We're however encountering an out of memory error (included at the end > of the message) regardless of the number of rdd partitions or the degree of > task parallelism being set. I noticed a warning at the beginning of the PCA > computation stage: " WARN > org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will > require at least 7011 megabyte of memory!" > I don't understand which memory this refers to. Is this the executor > memory? The driver memory? Any other? > The stacktrace appears to indicate that a large array is probably being > passed along with the task. Could this array have been passed as a > broadcast variable instead ? Any suggestions / workarounds other than > re-implementing the algorithm? > > Thanks, > Bharath > > > > Exception in thread "main" java.lang.OutOfMemoryError: Requested array > size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:2271) > at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) > at > org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091) > at > org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124) > at > org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350) > at > org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386) > at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46) > >
PCA OutOfMemoryError
We're running PCA (selecting 100 principal components) on a dataset that has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The matrix in question is mostly sparse with tens of columns populate in most rows, but a few rows with thousands of columns populated. We're running spark on mesos with driver memory set to 40G and executor memory set to 80G. We're however encountering an out of memory error (included at the end of the message) regardless of the number of rdd partitions or the degree of task parallelism being set. I noticed a warning at the beginning of the PCA computation stage: " WARN org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will require at least 7011 megabyte of memory!" I don't understand which memory this refers to. Is this the executor memory? The driver memory? Any other? The stacktrace appears to indicate that a large array is probably being passed along with the task. Could this array have been passed as a broadcast variable instead ? Any suggestions / workarounds other than re-implementing the algorithm? Thanks, Bharath Exception in thread "main" java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386) at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46)
Re: Spark on Mesos / Executor Memory
David, Tom, Thanks for the explanation. This confirms my suspicion that the executor was holding on to memory regardless of tasks in execution once it expands to occupy memory in keeping with spark.executor.memory. There certainly is scope for improvement here, though I realize there will substantial overheads in implementing memory release without compromising RDD caching and similar aspects. I'll explore alternatives / workarounds meanwhile. Thanks, Bharath On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld <t...@duedil.com> wrote: > Hi Bharath, > > When running jobs in fine grained mode, each Spark task is sent to mesos > as a task which allows the offers system to maintain fairness between > different spark application (as you've described). Having said that, unless > your memory per-node is hugely undersubscribed when running these jobs in > parallel. This behaviour matches exactly what you've described. > > What you're seeing happens because even though there's a new mesos task > for each Spark task (allowing CPU to be shared) the Spark executors don't > get killed even when they aren't doing any work, which means the memory > isn't released. The JVM doesn't allow for flexible memory re-allocation (as > far as i'm aware) which make it impossible for spark to dynamically scale > up the memory of the executor over time as tasks start and finish. > > As Dave pointed out, the simplest way to solve this is to use a higher > level tool that can run your spark jobs through one mesos framework and > then you can let spark distribute the resources more effectively. > > I hope that helps! > > Tom. > > On 17 Oct 2015, at 06:47, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > > Can someone respond if you're aware of the reason for such a memory > footprint? It seems unintuitive and hard to reason about. > > Thanks, > Bharath > > On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar <reachb...@gmail.com> > wrote: > >> Resending since user@mesos bounced earlier. My apologies. >> >> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <reachb...@gmail.com >> > wrote: >> >>> (Reviving this thread since I ran into similar issues...) >>> >>> I'm running two spark jobs (in mesos fine grained mode), each belonging >>> to a different mesos role, say low and high. The low:high mesos weights are >>> 1:10. On expected lines, I see that the low priority job occupies cluster >>> resources to the maximum extent when running alone. However, when the high >>> priority job is submitted, it does not start and continues to await cluster >>> resources (as seen in the logs). Since the jobs run in fine grained mode >>> and the low priority tasks begin to finish, the high priority job should >>> ideally be able to start and gradually take over cluster resources as per >>> the weights. However, I noticed that while the "low" job gives up CPU cores >>> with each completing task (e.g. reduction from 72 -> 12 with default >>> parallelism set to 72), the memory resources are held on (~500G out of >>> 768G). The spark.executor.memory setting appears to directly impact the >>> amount of memory that the job holds on to. In this case, it was set to 200G >>> in the low priority task and 100G in the high priority task. The nature of >>> these jobs is such that setting the numbers to smaller values (say 32g) >>> resulted in job failures with outofmemoryerror. It appears that the spark >>> framework is retaining memory (across tasks) proportional to >>> spark.executor.memory for the duration of the job and not releasing memory >>> as tasks complete. This defeats the purpose of fine grained mode execution >>> as the memory occupancy is preventing the high priority job from accepting >>> the prioritized cpu offers and beginning execution. Can this be explained / >>> documented better please? >>> >>> Thanks, >>> Bharath >>> >>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote: >>> >>>> (Adding spark user list) >>>> >>>> Hi Tom, >>>> >>>> If I understand correctly you're saying that you're running into memory >>>> problems because the scheduler is allocating too much CPUs and not enough >>>> memory to acoomodate them right? >>>> >>>> In the case of fine grain mode I don't think that's a problem since we >>>> have a fixed amount of CPU and memory per task. >>>> However, in coarse grain you can run into that problem if you're with >>>> in the spa
Re: Spark on Mesos / Executor Memory
To be precise, the MesosExecutorBackend's Xms & Xmx equal spark.executor.memory. So there's no question of expanding or contracting the memory held by the executor. On Sat, Oct 17, 2015 at 5:38 PM, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > David, Tom, > > Thanks for the explanation. This confirms my suspicion that the executor > was holding on to memory regardless of tasks in execution once it expands > to occupy memory in keeping with spark.executor.memory. There certainly is > scope for improvement here, though I realize there will substantial > overheads in implementing memory release without compromising RDD caching > and similar aspects. I'll explore alternatives / workarounds meanwhile. > > Thanks, > Bharath > > > > On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld <t...@duedil.com> wrote: > >> Hi Bharath, >> >> When running jobs in fine grained mode, each Spark task is sent to mesos >> as a task which allows the offers system to maintain fairness between >> different spark application (as you've described). Having said that, unless >> your memory per-node is hugely undersubscribed when running these jobs in >> parallel. This behaviour matches exactly what you've described. >> >> What you're seeing happens because even though there's a new mesos task >> for each Spark task (allowing CPU to be shared) the Spark executors don't >> get killed even when they aren't doing any work, which means the memory >> isn't released. The JVM doesn't allow for flexible memory re-allocation (as >> far as i'm aware) which make it impossible for spark to dynamically scale >> up the memory of the executor over time as tasks start and finish. >> >> As Dave pointed out, the simplest way to solve this is to use a higher >> level tool that can run your spark jobs through one mesos framework and >> then you can let spark distribute the resources more effectively. >> >> I hope that helps! >> >> Tom. >> >> On 17 Oct 2015, at 06:47, Bharath Ravi Kumar <reachb...@gmail.com> wrote: >> >> Can someone respond if you're aware of the reason for such a memory >> footprint? It seems unintuitive and hard to reason about. >> >> Thanks, >> Bharath >> >> On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar <reachb...@gmail.com >> > wrote: >> >>> Resending since user@mesos bounced earlier. My apologies. >>> >>> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar < >>> reachb...@gmail.com> wrote: >>> >>>> (Reviving this thread since I ran into similar issues...) >>>> >>>> I'm running two spark jobs (in mesos fine grained mode), each belonging >>>> to a different mesos role, say low and high. The low:high mesos weights are >>>> 1:10. On expected lines, I see that the low priority job occupies cluster >>>> resources to the maximum extent when running alone. However, when the high >>>> priority job is submitted, it does not start and continues to await cluster >>>> resources (as seen in the logs). Since the jobs run in fine grained mode >>>> and the low priority tasks begin to finish, the high priority job should >>>> ideally be able to start and gradually take over cluster resources as per >>>> the weights. However, I noticed that while the "low" job gives up CPU cores >>>> with each completing task (e.g. reduction from 72 -> 12 with default >>>> parallelism set to 72), the memory resources are held on (~500G out of >>>> 768G). The spark.executor.memory setting appears to directly impact the >>>> amount of memory that the job holds on to. In this case, it was set to 200G >>>> in the low priority task and 100G in the high priority task. The nature of >>>> these jobs is such that setting the numbers to smaller values (say 32g) >>>> resulted in job failures with outofmemoryerror. It appears that the spark >>>> framework is retaining memory (across tasks) proportional to >>>> spark.executor.memory for the duration of the job and not releasing memory >>>> as tasks complete. This defeats the purpose of fine grained mode execution >>>> as the memory occupancy is preventing the high priority job from accepting >>>> the prioritized cpu offers and beginning execution. Can this be explained / >>>> documented better please? >>>> >>>> Thanks, >>>> Bharath >>>> >>>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote: >>>> >>>
Re: Spark on Mesos / Executor Memory
Can someone respond if you're aware of the reason for such a memory footprint? It seems unintuitive and hard to reason about. Thanks, Bharath On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > Resending since user@mesos bounced earlier. My apologies. > > On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <reachb...@gmail.com> > wrote: > >> (Reviving this thread since I ran into similar issues...) >> >> I'm running two spark jobs (in mesos fine grained mode), each belonging >> to a different mesos role, say low and high. The low:high mesos weights are >> 1:10. On expected lines, I see that the low priority job occupies cluster >> resources to the maximum extent when running alone. However, when the high >> priority job is submitted, it does not start and continues to await cluster >> resources (as seen in the logs). Since the jobs run in fine grained mode >> and the low priority tasks begin to finish, the high priority job should >> ideally be able to start and gradually take over cluster resources as per >> the weights. However, I noticed that while the "low" job gives up CPU cores >> with each completing task (e.g. reduction from 72 -> 12 with default >> parallelism set to 72), the memory resources are held on (~500G out of >> 768G). The spark.executor.memory setting appears to directly impact the >> amount of memory that the job holds on to. In this case, it was set to 200G >> in the low priority task and 100G in the high priority task. The nature of >> these jobs is such that setting the numbers to smaller values (say 32g) >> resulted in job failures with outofmemoryerror. It appears that the spark >> framework is retaining memory (across tasks) proportional to >> spark.executor.memory for the duration of the job and not releasing memory >> as tasks complete. This defeats the purpose of fine grained mode execution >> as the memory occupancy is preventing the high priority job from accepting >> the prioritized cpu offers and beginning execution. Can this be explained / >> documented better please? >> >> Thanks, >> Bharath >> >> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote: >> >>> (Adding spark user list) >>> >>> Hi Tom, >>> >>> If I understand correctly you're saying that you're running into memory >>> problems because the scheduler is allocating too much CPUs and not enough >>> memory to acoomodate them right? >>> >>> In the case of fine grain mode I don't think that's a problem since we >>> have a fixed amount of CPU and memory per task. >>> However, in coarse grain you can run into that problem if you're with in >>> the spark.cores.max limit, and memory is a fixed number. >>> >>> I have a patch out to configure how much max cpus should coarse grain >>> executor use, and it also allows multiple executors in coarse grain mode. >>> So you could say try to launch multiples of max 4 cores with >>> spark.executor.memory (+ overhead and etc) in a slave. ( >>> https://github.com/apache/spark/pull/4027) >>> >>> It also might be interesting to include a cores to memory multiplier so >>> that with a larger amount of cores we try to scale the memory with some >>> factor, but I'm not entirely sure that's intuitive to use and what people >>> know what to set it to, as that can likely change with different workload. >>> >>> Tim >>> >>> >>> >>> >>> >>> >>> >>> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld <t...@duedil.com> wrote: >>> >>>> We're running Spark 1.3.0 (with a couple of patches over the top for >>>> docker related bits). >>>> >>>> I don't think SPARK-4158 is related to what we're seeing, things do run >>>> fine on the cluster, given a ridiculously large executor memory >>>> configuration. As for SPARK-3535 although that looks useful I think we'e >>>> seeing something else. >>>> >>>> Put a different way, the amount of memory required at any given time by >>>> the spark JVM process is directly proportional to the amount of CPU it has, >>>> because more CPU means more tasks and more tasks means more memory. Even if >>>> we're using coarse mode, the amount of executor memory should be >>>> proportionate to the amount of CPUs in the offer. >>>> >>>> On 11 April 2015 at 17:39, Brenden Matthews <bren...@diddyinc.com> >>>>
Re: Spark on Mesos / Executor Memory
Resending since user@mesos bounced earlier. My apologies. On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > (Reviving this thread since I ran into similar issues...) > > I'm running two spark jobs (in mesos fine grained mode), each belonging to > a different mesos role, say low and high. The low:high mesos weights are > 1:10. On expected lines, I see that the low priority job occupies cluster > resources to the maximum extent when running alone. However, when the high > priority job is submitted, it does not start and continues to await cluster > resources (as seen in the logs). Since the jobs run in fine grained mode > and the low priority tasks begin to finish, the high priority job should > ideally be able to start and gradually take over cluster resources as per > the weights. However, I noticed that while the "low" job gives up CPU cores > with each completing task (e.g. reduction from 72 -> 12 with default > parallelism set to 72), the memory resources are held on (~500G out of > 768G). The spark.executor.memory setting appears to directly impact the > amount of memory that the job holds on to. In this case, it was set to 200G > in the low priority task and 100G in the high priority task. The nature of > these jobs is such that setting the numbers to smaller values (say 32g) > resulted in job failures with outofmemoryerror. It appears that the spark > framework is retaining memory (across tasks) proportional to > spark.executor.memory for the duration of the job and not releasing memory > as tasks complete. This defeats the purpose of fine grained mode execution > as the memory occupancy is preventing the high priority job from accepting > the prioritized cpu offers and beginning execution. Can this be explained / > documented better please? > > Thanks, > Bharath > > On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <t...@mesosphere.io> wrote: > >> (Adding spark user list) >> >> Hi Tom, >> >> If I understand correctly you're saying that you're running into memory >> problems because the scheduler is allocating too much CPUs and not enough >> memory to acoomodate them right? >> >> In the case of fine grain mode I don't think that's a problem since we >> have a fixed amount of CPU and memory per task. >> However, in coarse grain you can run into that problem if you're with in >> the spark.cores.max limit, and memory is a fixed number. >> >> I have a patch out to configure how much max cpus should coarse grain >> executor use, and it also allows multiple executors in coarse grain mode. >> So you could say try to launch multiples of max 4 cores with >> spark.executor.memory (+ overhead and etc) in a slave. ( >> https://github.com/apache/spark/pull/4027) >> >> It also might be interesting to include a cores to memory multiplier so >> that with a larger amount of cores we try to scale the memory with some >> factor, but I'm not entirely sure that's intuitive to use and what people >> know what to set it to, as that can likely change with different workload. >> >> Tim >> >> >> >> >> >> >> >> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld <t...@duedil.com> wrote: >> >>> We're running Spark 1.3.0 (with a couple of patches over the top for >>> docker related bits). >>> >>> I don't think SPARK-4158 is related to what we're seeing, things do run >>> fine on the cluster, given a ridiculously large executor memory >>> configuration. As for SPARK-3535 although that looks useful I think we'e >>> seeing something else. >>> >>> Put a different way, the amount of memory required at any given time by >>> the spark JVM process is directly proportional to the amount of CPU it has, >>> because more CPU means more tasks and more tasks means more memory. Even if >>> we're using coarse mode, the amount of executor memory should be >>> proportionate to the amount of CPUs in the offer. >>> >>> On 11 April 2015 at 17:39, Brenden Matthews <bren...@diddyinc.com> >>> wrote: >>> >>>> I ran into some issues with it a while ago, and submitted a couple PRs >>>> to fix it: >>>> >>>> https://github.com/apache/spark/pull/2401 >>>> https://github.com/apache/spark/pull/3024 >>>> >>>> Do these look relevant? What version of Spark are you running? >>>> >>>> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld <t...@duedil.com> wrote: >>>> >>>>> Hey, >>>>> >>>>> Not s
Re: Spark on Mesos / Executor Memory
(Reviving this thread since I ran into similar issues...) I'm running two spark jobs (in mesos fine grained mode), each belonging to a different mesos role, say low and high. The low:high mesos weights are 1:10. On expected lines, I see that the low priority job occupies cluster resources to the maximum extent when running alone. However, when the high priority job is submitted, it does not start and continues to await cluster resources (as seen in the logs). Since the jobs run in fine grained mode and the low priority tasks begin to finish, the high priority job should ideally be able to start and gradually take over cluster resources as per the weights. However, I noticed that while the "low" job gives up CPU cores with each completing task (e.g. reduction from 72 -> 12 with default parallelism set to 72), the memory resources are held on (~500G out of 768G). The spark.executor.memory setting appears to directly impact the amount of memory that the job holds on to. In this case, it was set to 200G in the low priority task and 100G in the high priority task. The nature of these jobs is such that setting the numbers to smaller values (say 32g) resulted in job failures with outofmemoryerror. It appears that the spark framework is retaining memory (across tasks) proportional to spark.executor.memory for the duration of the job and not releasing memory as tasks complete. This defeats the purpose of fine grained mode execution as the memory occupancy is preventing the high priority job from accepting the prioritized cpu offers and beginning execution. Can this be explained / documented better please? Thanks, Bharath On Sat, Apr 11, 2015 at 10:59 PM, Tim Chenwrote: > (Adding spark user list) > > Hi Tom, > > If I understand correctly you're saying that you're running into memory > problems because the scheduler is allocating too much CPUs and not enough > memory to acoomodate them right? > > In the case of fine grain mode I don't think that's a problem since we > have a fixed amount of CPU and memory per task. > However, in coarse grain you can run into that problem if you're with in > the spark.cores.max limit, and memory is a fixed number. > > I have a patch out to configure how much max cpus should coarse grain > executor use, and it also allows multiple executors in coarse grain mode. > So you could say try to launch multiples of max 4 cores with > spark.executor.memory (+ overhead and etc) in a slave. ( > https://github.com/apache/spark/pull/4027) > > It also might be interesting to include a cores to memory multiplier so > that with a larger amount of cores we try to scale the memory with some > factor, but I'm not entirely sure that's intuitive to use and what people > know what to set it to, as that can likely change with different workload. > > Tim > > > > > > > > On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld wrote: > >> We're running Spark 1.3.0 (with a couple of patches over the top for >> docker related bits). >> >> I don't think SPARK-4158 is related to what we're seeing, things do run >> fine on the cluster, given a ridiculously large executor memory >> configuration. As for SPARK-3535 although that looks useful I think we'e >> seeing something else. >> >> Put a different way, the amount of memory required at any given time by >> the spark JVM process is directly proportional to the amount of CPU it has, >> because more CPU means more tasks and more tasks means more memory. Even if >> we're using coarse mode, the amount of executor memory should be >> proportionate to the amount of CPUs in the offer. >> >> On 11 April 2015 at 17:39, Brenden Matthews wrote: >> >>> I ran into some issues with it a while ago, and submitted a couple PRs >>> to fix it: >>> >>> https://github.com/apache/spark/pull/2401 >>> https://github.com/apache/spark/pull/3024 >>> >>> Do these look relevant? What version of Spark are you running? >>> >>> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld wrote: >>> Hey, Not sure whether it's best to ask this on the spark mailing list or the mesos one, so I'll try here first :-) I'm having a bit of trouble with out of memory errors in my spark jobs... it seems fairly odd to me that memory resources can only be set at the executor level, and not also at the task level. For example, as far as I can tell there's only a *spark.executor.memory* config option. Surely the memory requirements of a single executor are quite dramatically influenced by the number of concurrent tasks running? Given a shared cluster, I have no idea what % of an individual slave my executor is going to get, so I basically have to set the executor memory to a value that's correct when the whole machine is in use... Has anyone else running Spark on Mesos come across this, or maybe someone could correct my understanding of the config options?
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Doug, I did try setting that config parameter to a larger number (several minutes), but still wasn't able to retrieve additional context logs. Let us know if you have any success with it. Thanks, Bharath On Fri, Mar 20, 2015 at 3:21 AM, Doug Balog doug.sparku...@dugos.com wrote: I’m seeing the same problem. I’ve set logging to DEBUG, and I think some hints are in the “Yarn AM launch context” that is printed out before Yarn runs java. My next step is to talk to the admins and get them to set yarn.nodemanager.delete.debug-delay-sec in the config, as recommended in http://spark.apache.org/docs/latest/running-on-yarn.html Then I can see exactly whats in the directory. Doug ps Sorry for the dup message Bharath and Todd, used wrong email address. On Mar 19, 2015, at 1:19 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for clarifying Todd. This may then be an issue specific to the HDP version we're using. Will continue to debug and post back if there's any resolution. On Thu, Mar 19, 2015 at 3:40 AM, Todd Nist tsind...@gmail.com wrote: Yes I believe you are correct. For the build you may need to specify the specific HDP version of hadoop to use with the -Dhadoop.version=. I went with the default 2.6.0, but Horton may have a vendor specific version that needs to go here. I know I saw a similar post today where the solution was to use -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera installation. I am not sure what the HDP version would be to put here. -Todd On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Thanks for clarifying Todd. This may then be an issue specific to the HDP version we're using. Will continue to debug and post back if there's any resolution. On Thu, Mar 19, 2015 at 3:40 AM, Todd Nist tsind...@gmail.com wrote: Yes I believe you are correct. For the build you may need to specify the specific HDP version of hadoop to use with the -Dhadoop.version=. I went with the default 2.6.0, but Horton may have a vendor specific version that needs to go here. I know I saw a similar post today where the solution was to use -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera installation. I am not sure what the HDP version would be to put here. -Todd On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: ALS failure with size Integer.MAX_VALUE
Ok. We'll try using it in a test cluster running 1.2. On 16-Dec-2014 1:36 am, Xiangrui Meng men...@gmail.com wrote: Unfortunately, it will depends on the Sorter API in 1.2. -Xiangrui On Mon, Dec 15, 2014 at 11:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui, The block size limit was encountered even with reduced number of item blocks as you had expected. I'm wondering if I could try the new implementation as a standalone library against a 1.1 deployment. Does it have dependencies on any core API's in the current master? Thanks, Bharath On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And yes, I've been following the JIRA for the new ALS implementation. I'll try it out when it's ready for testing. . On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Re: ALS failure with size Integer.MAX_VALUE
Hi Xiangrui, The block size limit was encountered even with reduced number of item blocks as you had expected. I'm wondering if I could try the new implementation as a standalone library against a 1.1 deployment. Does it have dependencies on any core API's in the current master? Thanks, Bharath On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And yes, I've been following the JIRA for the new ALS implementation. I'll try it out when it's ready for testing. . On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Re: ALS failure with size Integer.MAX_VALUE
Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And yes, I've been following the JIRA for the new ALS implementation. I'll try it out when it's ready for testing. . On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Re: ALS failure with size Integer.MAX_VALUE
Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Re: ALS failure with size Integer.MAX_VALUE
Any suggestions to address the described problem? In particular, it appears that considering the skewed degree of some of the item nodes in the graph, I believe it should be possible to define better block sizes to reflect that fact, but am unsure of the way of arriving at the sizes accordingly. Thanks, Bharath On Fri, Nov 28, 2014 at 12:00 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) As for the data, on the user side, the degree of a node in the connectivity graph is relatively small. However, on the item side, 3.8K out of the 4.5K items are connected to 10^5 users each on an average, with 100 items being connected to nearly 10^8 users. The rest of the items are connected to less than 10^5 users. With such a skew in the connectivity graph, I'm unsure if additional memory or variation in the block sizes would help (considering my limited understanding of the implementation in mllib). Any suggestion to address the problem? The test is being run on a standalone cluster of 3 hosts, each with 100G RAM 24 cores dedicated to the application. The additional configs I made specific to the shuffle and task failure reduction are as follows: spark.core.connection.ack.wait.timeout=600 spark.shuffle.consolidateFiles=true spark.shuffle.manager=SORT The job execution summary is as follows: Active Stages: Stage id 2, aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200 (3 failed), Shuffle Read : 141.6 GB Completed Stages (5) Stage IdDescriptionDuration Tasks: Succeeded/TotalInputShuffle ReadShuffle Write 6org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min 1200/120029.9 GB1668.4 MB186.8 GB 5mapPartitionsWithIndex at ALS.scala:250 +details 3map at ALS.scala:231 0aggregate at ALS.scala:337 +details 1map at ALS.scala:228 +details Thanks, Bharath
ALS failure with size Integer.MAX_VALUE
We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) As for the data, on the user side, the degree of a node in the connectivity graph is relatively small. However, on the item side, 3.8K out of the 4.5K items are connected to 10^5 users each on an average, with 100 items being connected to nearly 10^8 users. The rest of the items are connected to less than 10^5 users. With such a skew in the connectivity graph, I'm unsure if additional memory or variation in the block sizes would help (considering my limited understanding of the implementation in mllib). Any suggestion to address the problem? The test is being run on a standalone cluster of 3 hosts, each with 100G RAM 24 cores dedicated to the application. The additional configs I made specific to the shuffle and task failure reduction are as follows: spark.core.connection.ack.wait.timeout=600 spark.shuffle.consolidateFiles=true spark.shuffle.manager=SORT The job execution summary is as follows: Active Stages: Stage id 2, aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200 (3 failed), Shuffle Read : 141.6 GB Completed Stages (5) Stage IdDescriptionDuration Tasks: Succeeded/TotalInputShuffle ReadShuffle Write 6org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min 1200/120029.9 GB1668.4 MB186.8 GB 5mapPartitionsWithIndex at ALS.scala:250 +details 3map at ALS.scala:231 0aggregate at ALS.scala:337 +details 1map at ALS.scala:228 +details Thanks, Bharath
Re: OOM with groupBy + saveAsTextFile
The result was no different with saveAsHadoopFile. In both cases, I can see that I've misinterpreted the API docs. I'll explore the API's a bit further for ways to save the iterable as chunks rather than one large text/binary. It might also help to clarify this aspect in the API docs. For those (like me) whose first practical experience with data processing is through spark, having skipped the Hadoop MR ecosystem, it might help to clarify interactions with HDFS and the likes. Thanks for all the help. On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote: saveAsText means save every element of the RDD as one line of text. It works like TextOutputFormat in Hadoop MapReduce since that's what it uses. So you are causing it to create one big string out of each Iterable this way. On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for responding. This is what I initially suspected, and hence asked why the library needed to construct the entire value buffer on a single host before writing it out. The stacktrace appeared to suggest that user code is not constructing the large buffer. I'm simply calling groupBy and saveAsText on the resulting grouped rdd. The value after grouping is an IterableTuple4String, Double, String, String. None of the strings are large. I also do not need a single large string created out of the Iterable for writing to disk. Instead, I expect the iterable to get written out in chunks in response to saveAsText. This shouldn't be the default behaviour of saveAsText perhaps? Hence my original question of the behavior of saveAsText. The tuning / partitioning attempts were aimed at reducing memory pressure so that multiple such buffers aren't constructed at the same time on a host. I'll take a second look at the data and code before updating this thread. Thanks.
Re: OOM with groupBy + saveAsTextFile
I also realized from your description of saveAsText that the API is indeed behaving as expected i.e. it is appropriate (though not optimal) for the API to construct a single string out of the value. If the value turns out to be large, the user of the API needs to reconsider the implementation approach. My bad. On Mon, Nov 3, 2014 at 3:38 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: The result was no different with saveAsHadoopFile. In both cases, I can see that I've misinterpreted the API docs. I'll explore the API's a bit further for ways to save the iterable as chunks rather than one large text/binary. It might also help to clarify this aspect in the API docs. For those (like me) whose first practical experience with data processing is through spark, having skipped the Hadoop MR ecosystem, it might help to clarify interactions with HDFS and the likes. Thanks for all the help. On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote: saveAsText means save every element of the RDD as one line of text. It works like TextOutputFormat in Hadoop MapReduce since that's what it uses. So you are causing it to create one big string out of each Iterable this way. On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for responding. This is what I initially suspected, and hence asked why the library needed to construct the entire value buffer on a single host before writing it out. The stacktrace appeared to suggest that user code is not constructing the large buffer. I'm simply calling groupBy and saveAsText on the resulting grouped rdd. The value after grouping is an IterableTuple4String, Double, String, String. None of the strings are large. I also do not need a single large string created out of the Iterable for writing to disk. Instead, I expect the iterable to get written out in chunks in response to saveAsText. This shouldn't be the default behaviour of saveAsText perhaps? Hence my original question of the behavior of saveAsText. The tuning / partitioning attempts were aimed at reducing memory pressure so that multiple such buffers aren't constructed at the same time on a host. I'll take a second look at the data and code before updating this thread. Thanks.
Re: OOM with groupBy + saveAsTextFile
Thanks for responding. This is what I initially suspected, and hence asked why the library needed to construct the entire value buffer on a single host before writing it out. The stacktrace appeared to suggest that user code is not constructing the large buffer. I'm simply calling groupBy and saveAsText on the resulting grouped rdd. The value after grouping is an IterableTuple4String, Double, String, String. None of the strings are large. I also do not need a single large string created out of the Iterable for writing to disk. Instead, I expect the iterable to get written out in chunks in response to saveAsText. This shouldn't be the default behaviour of saveAsText perhaps? Hence my original question of the behavior of saveAsText. The tuning / partitioning attempts were aimed at reducing memory pressure so that multiple such buffers aren't constructed at the same time on a host. I'll take a second look at the data and code before updating this thread. Thanks. None of your tuning will help here because the problem is actually the way you are saving the output. If you take a look at the stacktrace, it is trying to build a single string that is too large for the VM to allocate memory. The VM is actually not running out of memory, but rather, JVM cannot support a single String so large. I suspect this is due to the fact that the value in your key, value pair after group by is too long (maybe it concatenates every single record). Do you really want to save the key, value output this way using a text file? Maybe you can write them out as multiple strings rather than a single super giant string. On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, FYI as follows. Could you post your heap size settings as well your Spark app code? Regards Arthur 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail message Requested array size exceeds VM limit indicates that the application (or APIs used by that application) attempted to allocate an array that is larger than the heap size. For example, if an application attempts to allocate an array of 512MB but the maximum heap size is 256MB then OutOfMemoryError will be thrown with the reason Requested array size exceeds VM limit. In most cases the problem is either a configuration issue (heap size too small), or a bug that results in an application attempting to create a huge array, for example, when the number of elements in the array are computed using an algorithm that computes an incorrect size.” On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar reachb...@gmail.com wrote: Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort use case in a framework that is leading in sorting bench marks? Or is there something fundamentally wrong in the usage? On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 1061 keys with values being IterableTuple4String, Integer, Double, String. The job runs on 3 hosts in a standalone setup with each host's executor having 100G RAM and 24 cores dedicated to it. While the groupBy stage completes successfully with ~24GB of shuffle write, the saveAsTextFile fails after repeated retries with each attempt failing due to an out of memory error *[1]*. I understand that a few partitions may be overloaded as a result of the groupBy and I've tried the following config combinations unsuccessfully: 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 1061 paritions and have max cores = 3 so that each key is a logical partition (though many partitions will end up on very few hosts), and each host likely runs saveAsTextFile on a single key at a time due to max cores = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. 2) Leave max cores unspecified, set the level of parallelism to 72, and leave number of partitions unspecified (in which case the # input partitions was used, which is 44) Since I do not intend to cache RDD's, I have set spark.storage.memoryFraction=0.2 in both cases. My understanding is that if each host is processing a single logical partition to saveAsTextFile and is reading from other hosts to write out the RDD, it is unlikely that it would run out of memory. My interpretation of the spark tuning guide is that the degree of parallelism has little impact in case (1) above since max cores = number of hosts. Can someone explain why there are still OOM's with 100G being available? On a related note, intuitively (though I haven't read the source), it appears that an entire key-value pair needn't fit into memory of a single host for saveAsTextFile since a single shuffle read from a remote can be written to HDFS before the next remote read is carried out. This way, not all data needs
OOM with groupBy + saveAsTextFile
Hi, I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 1061 keys with values being IterableTuple4String, Integer, Double, String. The job runs on 3 hosts in a standalone setup with each host's executor having 100G RAM and 24 cores dedicated to it. While the groupBy stage completes successfully with ~24GB of shuffle write, the saveAsTextFile fails after repeated retries with each attempt failing due to an out of memory error *[1]*. I understand that a few partitions may be overloaded as a result of the groupBy and I've tried the following config combinations unsuccessfully: 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 1061 paritions and have max cores = 3 so that each key is a logical partition (though many partitions will end up on very few hosts), and each host likely runs saveAsTextFile on a single key at a time due to max cores = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. 2) Leave max cores unspecified, set the level of parallelism to 72, and leave number of partitions unspecified (in which case the # input partitions was used, which is 44) Since I do not intend to cache RDD's, I have set spark.storage.memoryFraction=0.2 in both cases. My understanding is that if each host is processing a single logical partition to saveAsTextFile and is reading from other hosts to write out the RDD, it is unlikely that it would run out of memory. My interpretation of the spark tuning guide is that the degree of parallelism has little impact in case (1) above since max cores = number of hosts. Can someone explain why there are still OOM's with 100G being available? On a related note, intuitively (though I haven't read the source), it appears that an entire key-value pair needn't fit into memory of a single host for saveAsTextFile since a single shuffle read from a remote can be written to HDFS before the next remote read is carried out. This way, not all data needs to be collected at the same time. Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the tuning guide and even as per Datastax's spark introduction), there may need to be more documentation around the internals of spark to help users take better informed tuning decisions with parallelism, max cores, number partitions and other tunables. Is there any ongoing effort on that front? Thanks, Bharath *[1]* OOM stack trace and logs 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size exceeds VM limit java.util.Arrays.copyOf(Arrays.java:3326) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) java.lang.StringBuilder.append(StringBuilder.java:136) scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) scala.Tuple2.toString(Tuple2.scala:22) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, 43704, 0), shuffleId=0, mapId=13, reduceId=92) 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 (groupBy at ModelTrainer.java:133) 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at ModelTrainer.java:141) failed in 55.259 s 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at ModelTrainer.java:133) and Stage 36 (saveAsTextFile at ModelTrainer.java:141) due to fetch failure
Re: OOM with groupBy + saveAsTextFile
Minor clarification: I'm running spark 1.1.0 on JDK 1.8, Linux 64 bit. On Sun, Nov 2, 2014 at 1:06 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 1061 keys with values being IterableTuple4String, Integer, Double, String. The job runs on 3 hosts in a standalone setup with each host's executor having 100G RAM and 24 cores dedicated to it. While the groupBy stage completes successfully with ~24GB of shuffle write, the saveAsTextFile fails after repeated retries with each attempt failing due to an out of memory error *[1]*. I understand that a few partitions may be overloaded as a result of the groupBy and I've tried the following config combinations unsuccessfully: 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 1061 paritions and have max cores = 3 so that each key is a logical partition (though many partitions will end up on very few hosts), and each host likely runs saveAsTextFile on a single key at a time due to max cores = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. 2) Leave max cores unspecified, set the level of parallelism to 72, and leave number of partitions unspecified (in which case the # input partitions was used, which is 44) Since I do not intend to cache RDD's, I have set spark.storage.memoryFraction=0.2 in both cases. My understanding is that if each host is processing a single logical partition to saveAsTextFile and is reading from other hosts to write out the RDD, it is unlikely that it would run out of memory. My interpretation of the spark tuning guide is that the degree of parallelism has little impact in case (1) above since max cores = number of hosts. Can someone explain why there are still OOM's with 100G being available? On a related note, intuitively (though I haven't read the source), it appears that an entire key-value pair needn't fit into memory of a single host for saveAsTextFile since a single shuffle read from a remote can be written to HDFS before the next remote read is carried out. This way, not all data needs to be collected at the same time. Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the tuning guide and even as per Datastax's spark introduction), there may need to be more documentation around the internals of spark to help users take better informed tuning decisions with parallelism, max cores, number partitions and other tunables. Is there any ongoing effort on that front? Thanks, Bharath *[1]* OOM stack trace and logs 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size exceeds VM limit java.util.Arrays.copyOf(Arrays.java:3326) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) java.lang.StringBuilder.append(StringBuilder.java:136) scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) scala.Tuple2.toString(Tuple2.scala:22) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, 43704, 0), shuffleId=0, mapId=13, reduceId=92) 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 (groupBy at ModelTrainer.java:133) 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at ModelTrainer.java:141) failed in 55.259 s 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at ModelTrainer.java:133) and Stage 36 (saveAsTextFile at ModelTrainer.java:141) due to fetch failure
Re: OOM with groupBy + saveAsTextFile
Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort use case in a framework that is leading in sorting bench marks? Or is there something fundamentally wrong in the usage? On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 1061 keys with values being IterableTuple4String, Integer, Double, String. The job runs on 3 hosts in a standalone setup with each host's executor having 100G RAM and 24 cores dedicated to it. While the groupBy stage completes successfully with ~24GB of shuffle write, the saveAsTextFile fails after repeated retries with each attempt failing due to an out of memory error *[1]*. I understand that a few partitions may be overloaded as a result of the groupBy and I've tried the following config combinations unsuccessfully: 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 1061 paritions and have max cores = 3 so that each key is a logical partition (though many partitions will end up on very few hosts), and each host likely runs saveAsTextFile on a single key at a time due to max cores = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. 2) Leave max cores unspecified, set the level of parallelism to 72, and leave number of partitions unspecified (in which case the # input partitions was used, which is 44) Since I do not intend to cache RDD's, I have set spark.storage.memoryFraction=0.2 in both cases. My understanding is that if each host is processing a single logical partition to saveAsTextFile and is reading from other hosts to write out the RDD, it is unlikely that it would run out of memory. My interpretation of the spark tuning guide is that the degree of parallelism has little impact in case (1) above since max cores = number of hosts. Can someone explain why there are still OOM's with 100G being available? On a related note, intuitively (though I haven't read the source), it appears that an entire key-value pair needn't fit into memory of a single host for saveAsTextFile since a single shuffle read from a remote can be written to HDFS before the next remote read is carried out. This way, not all data needs to be collected at the same time. Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the tuning guide and even as per Datastax's spark introduction), there may need to be more documentation around the internals of spark to help users take better informed tuning decisions with parallelism, max cores, number partitions and other tunables. Is there any ongoing effort on that front? Thanks, Bharath *[1]* OOM stack trace and logs 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size exceeds VM limit java.util.Arrays.copyOf(Arrays.java:3326) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) java.lang.StringBuilder.append(StringBuilder.java:136) scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) scala.Tuple2.toString(Tuple2.scala:22) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, 43704, 0), shuffleId=0, mapId=13, reduceId=92) 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 (groupBy at ModelTrainer.java:133) 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at ModelTrainer.java:141) failed in 55.259 s 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at ModelTrainer.java:133) and Stage 36 (saveAsTextFile at ModelTrainer.java:141) due to fetch failure
Re: OOM writing out sorted RDD
Update: as expected, switching to kryo merely delays the inevitable. Does anyone have experience controlling memory consumption while processing (e.g. writing out) imbalanced partitions? On 09-Aug-2014 10:41 am, Bharath Ravi Kumar reachb...@gmail.com wrote: Our prototype application reads a 20GB dataset from HDFS (nearly 180 partitions), groups it by key, sorts by rank and write out to HDFS in that order. The job runs against two nodes (16G, 24 cores per node available to the job). I noticed that the execution plan results in two sortByKey stages, followed by groupByKey and a saveAsTextFile. The shuffle write for groupByKey is ~20G in size. During the saveAsTextFile stage, however, after writing out 50% of the partitions, the memory on one of the executors shoots up to 16G and the executor spends all its time in GC's. Eventually, the logs show an OOM [1] included at the end of the mail followed by another TID loss to FileSystem closed errors indicated in stacktrace [2]. I noticed that the partitions may be skewed as a result of the sort, with one or two paritions having upto 10% of all rows. I also noticed that the data written out until the 50% stage (when memory shoots up) had a large number of empty part- files followed by a few files of 200M in size. It could hence be that the attempt to write out one large partition is causing the OOM. The tuning documentation states that a larger level of parallelism might help mitigate the problem, but setting default parallelism to 2x the number of cores didn't help either. While I could attempt to partition manually (choosing custom ranges for a range partitioner), it appears that limiting read sizes (from the earlier shuffle) during the write to HDFS should help successfully write out even overloaded partitions as well. Are there parameters that might help achieve that? (On a related note, any idea if using Kryo serialization would help in this case?) Thanks, Bharath [1] 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137) 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at org.apache.spark.InterruptibleIterator.to (InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62) [2] 14/08/09 04:59:26
OOM writing out sorted RDD
Our prototype application reads a 20GB dataset from HDFS (nearly 180 partitions), groups it by key, sorts by rank and write out to HDFS in that order. The job runs against two nodes (16G, 24 cores per node available to the job). I noticed that the execution plan results in two sortByKey stages, followed by groupByKey and a saveAsTextFile. The shuffle write for groupByKey is ~20G in size. During the saveAsTextFile stage, however, after writing out 50% of the partitions, the memory on one of the executors shoots up to 16G and the executor spends all its time in GC's. Eventually, the logs show an OOM [1] included at the end of the mail followed by another TID loss to FileSystem closed errors indicated in stacktrace [2]. I noticed that the partitions may be skewed as a result of the sort, with one or two paritions having upto 10% of all rows. I also noticed that the data written out until the 50% stage (when memory shoots up) had a large number of empty part- files followed by a few files of 200M in size. It could hence be that the attempt to write out one large partition is causing the OOM. The tuning documentation states that a larger level of parallelism might help mitigate the problem, but setting default parallelism to 2x the number of cores didn't help either. While I could attempt to partition manually (choosing custom ranges for a range partitioner), it appears that limiting read sizes (from the earlier shuffle) during the write to HDFS should help successfully write out even overloaded partitions as well. Are there parameters that might help achieve that? (On a related note, any idea if using Kryo serialization would help in this case?) Thanks, Bharath [1] 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137) 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at org.apache.spark.InterruptibleIterator.to (InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62) [2] 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 774 (task 33.0:198) 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /tmp/spark-local-20140809033328-e374/31/shuffle_0_5_198 (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at
Implementing percentile through top Vs take
I'm looking to select the top n records (by rank) from a data set of a few hundred GB's. My understanding is that JavaRDD.top(n, comparator) is entirely a driver-side operation in that all records are sorted in the driver's memory. I prefer an approach where the records are sorted on the cluster and only the top ones sent to the driver. I'm hence leaning towards creating a JavaPairRDD on a key, then sorting the rdd by key and invoking take(N). I'd like to know if rdd.top achieves the same result (while being executed on the cluster) as take or if my assumption that it's a driver side operation is correct. Thanks, Bharath
Re: Hadoop client protocol mismatch with spark 1.0.1, cdh3u5
Any suggestions to work around this issue ? The pre built spark binaries don't appear to work against cdh as documented, unless there's a build issue, which seems unlikely. On 25-Jul-2014 3:42 pm, Bharath Ravi Kumar reachb...@gmail.com wrote: I'm encountering a hadoop client protocol mismatch trying to read from HDFS (cdh3u5) using the pre-build spark from the downloads page (linked under For Hadoop 1 (HDP1, CDH3)). I've also followed the instructions at http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html (i.e. building the app against hadoop-client 0.20.2-cdh3u5), but continue to see the following error regardless of whether I link the app with the cdh client: 14/07/25 09:53:43 INFO client.AppClient$ClientActor: Executor updated: app-20140725095343-0016/1 is now RUNNING 14/07/25 09:53:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/25 09:53:43 WARN snappy.LoadSnappy: Snappy native library not loaded Exception in thread main org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 61, server = 63) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) While I can build spark against the exact hadoop distro version, I'd rather work with the standard prebuilt binaries, making additional changes while building the app if necessary. Any workarounds/recommendations? Thanks, Bharath
Re: Hadoop client protocol mismatch with spark 1.0.1, cdh3u5
Thanks for responding. I used the pre built spark binaries meant for hadoop1,cdh3u5. I do not intend to build spark against a specific distribution. Irrespective of whether I build my app with the explicit cdh hadoop client dependency, I get the same error message. I also verified that my app's uber jar had pulled in the cdh hadoop client dependencies. On 25-Jul-2014 9:26 pm, Sean Owen so...@cloudera.com wrote: This indicates your app is not actually using the version of the HDFS client you think. You built Spark from source with the right deps it seems, but are you sure you linked to your build in your app? On Fri, Jul 25, 2014 at 4:32 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Any suggestions to work around this issue ? The pre built spark binaries don't appear to work against cdh as documented, unless there's a build issue, which seems unlikely. On 25-Jul-2014 3:42 pm, Bharath Ravi Kumar reachb...@gmail.com wrote: I'm encountering a hadoop client protocol mismatch trying to read from HDFS (cdh3u5) using the pre-build spark from the downloads page (linked under For Hadoop 1 (HDP1, CDH3)). I've also followed the instructions at http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html (i.e. building the app against hadoop-client 0.20.2-cdh3u5), but continue to see the following error regardless of whether I link the app with the cdh client: 14/07/25 09:53:43 INFO client.AppClient$ClientActor: Executor updated: app-20140725095343-0016/1 is now RUNNING 14/07/25 09:53:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/25 09:53:43 WARN snappy.LoadSnappy: Snappy native library not loaded Exception in thread main org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 61, server = 63) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) While I can build spark against the exact hadoop distro version, I'd rather work with the standard prebuilt binaries, making additional changes while building the app if necessary. Any workarounds/recommendations? Thanks, Bharath
Re: Hadoop client protocol mismatch with spark 1.0.1, cdh3u5
That's right, I'm looking to depend on spark in general and change only the hadoop client deps. The spark master and slaves use the spark-1.0.1-bin-hadoop1 binaries from the downloads page. The relevant snippet from the app's maven pom is as follows: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.0.1/version scopeprovided/scope /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version0.20.2-cdh3u5/version typejar/type /dependency /dependencies repositories repository idCloudera repository/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos/ /url /repository repository idAkka repository/id urlhttp://repo.akka.io/releases/url /repository /repositories Thanks, Bharath On Fri, Jul 25, 2014 at 10:29 PM, Sean Owen so...@cloudera.com wrote: If you link against the pre-built binary, that's for Hadoop 1.0.4. Can you show your deps to clarify what you are depending on? Building custom Spark and depending on it is a different thing from depending on plain Spark and changing its deps. I think you want the latter. On Fri, Jul 25, 2014 at 5:46 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for responding. I used the pre built spark binaries meant for hadoop1,cdh3u5. I do not intend to build spark against a specific distribution. Irrespective of whether I build my app with the explicit cdh hadoop client dependency, I get the same error message. I also verified that my app's uber jar had pulled in the cdh hadoop client dependencies. On 25-Jul-2014 9:26 pm, Sean Owen so...@cloudera.com wrote: This indicates your app is not actually using the version of the HDFS client you think. You built Spark from source with the right deps it seems, but are you sure you linked to your build in your app? On Fri, Jul 25, 2014 at 4:32 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Any suggestions to work around this issue ? The pre built spark binaries don't appear to work against cdh as documented, unless there's a build issue, which seems unlikely. On 25-Jul-2014 3:42 pm, Bharath Ravi Kumar reachb...@gmail.com wrote: I'm encountering a hadoop client protocol mismatch trying to read from HDFS (cdh3u5) using the pre-build spark from the downloads page (linked under For Hadoop 1 (HDP1, CDH3)). I've also followed the instructions at http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html (i.e. building the app against hadoop-client 0.20.2-cdh3u5), but continue to see the following error regardless of whether I link the app with the cdh client: 14/07/25 09:53:43 INFO client.AppClient$ClientActor: Executor updated: app-20140725095343-0016/1 is now RUNNING 14/07/25 09:53:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/25 09:53:43 WARN snappy.LoadSnappy: Snappy native library not loaded Exception in thread main org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 61, server = 63) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) While I can build spark against the exact hadoop distro version, I'd rather work with the standard prebuilt binaries, making additional changes while building the app if necessary. Any workarounds/recommendations? Thanks, Bharath
Re: Execution stalls in LogisticRegressionWithSGD
Hi Xiangrui, The issue with aggergating/counting over large feature vectors (as part of LogisticRegressionWithSGD) continues to exist, but now in another form: while the execution doesn't freeze (due to SPARK-1112), it now fails at the second or third gradient descent iteration consistently with an error level log message, but no stacktrace. I'm running against 1.0.1-rc1, and have tried setting spark.akka.frameSize as high as 500. When the execution fails, each of the two executors log the following message (corresponding to aggregate at GradientDescent.scala:178) : 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 0 ms 14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is 5959086 14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to driver 14/07/02 14:09:11 INFO Executor: Finished task ID 737 14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@(slave1,slave2):51941] - [akka.tcp://spark@master:59487] disassociated! Shutting down. There is no separate stacktrace on the driver side. Each input record is of the form p1, p2, (p1,p2) where p1, p2 (p1,p2) are categorical features with large cardinality, and X is the double label with a continuous value. The categorical variables are converted to binary variables which results in a feature vector of size 741092 (composed of all unique categories across p1, p2 and (p1,p2)). Thus, the labeled point for input record is a sparse vector of size 741092 with only 3 variables set in the record. The total number of records is 683233 after aggregating the input data on (p1, p2). When attempting to train on the unaggregated records (1337907 in number spread across 455 files), the execution fails at count, GradientDescent.scala:161 with the following log (Snipped lines corresponding to other input files) 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-00012:2834590+2834590 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-5:0+2845559 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-5:2845559+2845560 14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is 615 14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to driver 14/07/02 16:02:03 INFO Executor: Finished task ID 726 14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@slave1:48423] - [akka.tcp://spark@master:55792] disassociated! Shutting down. A count() attempted on the input RDD before beginning training has the following metrics: MetricMin25thMedian75th Max Result serialization time0 ms0 ms0 ms0 ms0 ms Duration33 s33 s35 s35 s35 s Time spent fetching task results0 ms0 ms0 ms0 ms0 ms Scheduler delay0.1 s0.1 s0.3 s0.3 s0.3 s Aggregated Metrics by Executor ID Address Task Time Total Failed Succeeded Shuffle Read Shuffle Write Shuf Spill (Mem) Shuf Spill (Disk) 0 CANNOT FIND ADDRESS 34 s 1 0 1 0.0 B 0.0 B 0.0 B 0.0 B 1 CANNOT FIND ADDRESS 36 s 1 0 1 0.0 B 0.0 B 0.0 B 0.0 B Tasks Task IndexTask IDStatusLocality LevelExecutorLaunch TimeDurationGC TimeResult Ser TimeErrors 0 726 SUCCESS PROCESS_LOCAL slave1 2014/07/02 16:01:28 35 s 0.1 s 1 727 SUCCESS PROCESS_LOCAL slave2 2014/07/02 16:01:28 33 s 99 ms Any pointers / diagnosis please? On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks. I'll await the fix to re-run my test. On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, This is related to SPARK-1112, which we already found the root cause. I will let you know when this is fixed. Best, Xiangrui On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Couple more points: 1)The inexplicable stalling of execution with large feature sets appears similar to that reported with the news-20 dataset: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer, Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is unrelated to mllib. Thanks, Bharath On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui , I'm using
Re: Execution stalls in LogisticRegressionWithSGD
Thanks. I'll await the fix to re-run my test. On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, This is related to SPARK-1112, which we already found the root cause. I will let you know when this is fixed. Best, Xiangrui On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Couple more points: 1)The inexplicable stalling of execution with large feature sets appears similar to that reported with the news-20 dataset: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer, Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is unrelated to mllib. Thanks, Bharath On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser Time Errors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS
Execution stalls in LogisticRegressionWithSGD
Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 13 ms 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 12 ms 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 16 ms 19 619 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 18 ms Executor stats: RDD BlocksMemory UsedDisk UsedActive TasksFailed Tasks Complete TasksTotal TasksTask Time
Re: Execution stalls in LogisticRegressionWithSGD
Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 13 ms 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 12 ms 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 17 617 SUCCESS PROCESS_LOCAL
Re: Execution stalls in LogisticRegressionWithSGD
Couple more points: 1)The inexplicable stalling of execution with large feature sets appears similar to that reported with the news-20 dataset: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer, Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is unrelated to mllib. Thanks, Bharath On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL
Standalone client failing with docker deployed cluster
Hi, I'm running the spark server with a single worker on a laptop using the docker images. The spark shell examples run fine with this setup. However, a standalone java client that tries to run wordcount on a local files (1 MB in size), the execution fails with the following error on the stdout of the worker: 14/05/15 10:31:21 INFO Slf4jLogger: Slf4jLogger started 14/05/15 10:31:21 INFO Remoting: Starting remoting 14/05/15 10:31:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@worker1:55924] 14/05/15 10:31:22 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@worker1:55924] 14/05/15 10:31:22 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@R9FX97h.local:56720/user/CoarseGrainedScheduler 14/05/15 10:31:22 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@worker1:50040/user/Worker 14/05/15 10:31:22 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@R9FX97h.local:56720]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/05/15 10:31:22 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@worker1:55924] - [akka.tcp://spark@R9FX97h.local:56720] disassociated! Shutting down. I noticed the following messages on the worker console when I attached through docker: 14/05/15 11:24:33 INFO Worker: Asked to launch executor app-20140515112408-0005/7 for billingLogProcessor 14/05/15 11:24:33 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:42437]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:42437] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:42437 ] 14/05/15 11:24:33 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:42437]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:42437] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:42437 ] 14/05/15 11:24:33 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-7-openjdk-amd64/bin/java -cp :/opt/spark-0.9.0/conf:/opt/spark-0.9.0/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@R9FX97h.local:46986/user/CoarseGrainedScheduler 7 worker1 1 akka.tcp://sparkWorker@worker1:50040/user/Worker app-20140515112408-0005 14/05/15 11:24:35 INFO Worker: Executor app-20140515112408-0005/7 finished with state FAILED message Command exited with code 1 exitStatus 1 14/05/15 11:24:35 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40172.17.0.4%3A33648-135#310170905] was not delivered. [34] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/05/15 11:24:35 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:56594]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:56594] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:56594 ] 14/05/15 11:24:35 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:56594]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:56594] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:56594 ] The significant code snippets from the standalone java client are as follows: JavaSparkContext ctx = new JavaSparkContext(masterAddr, log_processor, sparkHome, jarFileLoc); JavaRDDString rawLog = ctx.textFile(/tmp/some.log); ListTuple2String, Long topRecords = rawLog.map(fieldSplitter).map(fieldExtractor).top(5, tupleComparator); However, running the sample code provided on github (amplab docker page) over the spark shell went through fine with the following stdout message: 14/05/15 10:39:41 INFO Slf4jLogger: Slf4jLogger started 14/05/15 10:39:42 INFO Remoting: Starting remoting 14/05/15 10:39:42