there has been some work on optimizing in-memory assigns for vectors, but the matrix work for the in-memory java-backed assigns is admittedly more patchy at best, given the amount of variations.
On Mon, Aug 21, 2017 at 12:05 PM, Pat Ferrel <p...@occamsmachete.com> wrote: > Matt > > I’ll create a feature branch of Mahout in my git repo for simplicity (we > are in code freeze for Mahout right now) Then if you could peel off you > changes and make a PR against it. Everyone can have a look before any > change is made to the ASF repos. > > Do a PR against this https://github.com/pferrel/mahout/tree/sparse-speedup > <https://github.com/pferrel/mahout/tree/sparse-speedup>, even if it’s not > working we can take a look. The branch right now is just a snapshot of the > current master in code freeze. > > Mahout has always had methods to work with different levels of sparsity > and you may have found a missing point to optimize. Let’s hope so. > > > On Aug 21, 2017, at 11:47 AM, Andrew Palumbo <ap....@outlook.com> wrote: > > I should mention that the densisty is currently set quite high, and we've > been discussing a user defined setting for this. Something that we have > not worked in yet. > > ________________________________ > From: Andrew Palumbo <ap....@outlook.com> > Sent: Monday, August 21, 2017 2:44:35 PM > To: user@mahout.apache.org > Subject: Re: spark-itemsimilarity scalability / Spark parallelism issues > (SimilarityAnalysis.cooccurrencesIDSs) > > > We do currently have optimizations based on density analysis in use e.g.: > in AtB. > > > https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4 > 5863df3e53/math-scala/src/main/scala/org/apache/mahout/ > math/scalabindings/package.scala#L431 > > > > +1 to PR. thanks for pointing this out. > > > --andy > > ________________________________ > From: Pat Ferrel <p...@occamsmachete.com> > Sent: Monday, August 21, 2017 2:26:58 PM > To: user@mahout.apache.org > Subject: Re: spark-itemsimilarity scalability / Spark parallelism issues > (SimilarityAnalysis.cooccurrencesIDSs) > > That looks like ancient code from the old mapreduce days. If is passes > unit tests create a PR. > > Just a guess here but there are times when this might not speed up thing > but slow them down. However for vey sparse matrixes that you might see in > CF this could work quite well. Some of the GPU optimization will eventually > be keyed off the density of a matrix, or selectable from knowing it’s > characteristics. > > I use this code all the time and would be very interested in a version > that works with CF style very sparse matrices. > > Long story short, create a PR so the optimizer guys can think through the > implications. If I can also test it I have some large real-world data where > I can test real-world speedup. > > > On Aug 21, 2017, at 10:53 AM, Pat Ferrel <p...@occamsmachete.com> wrote: > > Interesting indeed. What is “massive”? Does the change pass all unit tests? > > > On Aug 17, 2017, at 1:04 PM, Scruggs, Matt <matt.scru...@bronto.com> > wrote: > > Thanks for the remarks guys! > > I profiled the code running locally on my machine and discovered this loop > is where these setQuick() and getQuick() calls originate (during matrix > Kryo deserialization), and as you can see the complexity of this 2D loop > can be very high: > > https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4 > 5863df3e53/math/src/main/java/org/apache/mahout/math/ > AbstractMatrix.java#L240 > > > Recall that this algorithm uses SparseRowMatrix whose rows are > SequentialAccessSparseVector, so all this looping seems unnecessary. I > created a new subclass of SparseRowMatrix that overrides that > assign(matrix, function) method, and instead of looping through all the > columns of each row, it calls SequentialAccessSparseVector.iterateNonZero() > so it only has to touch the cells with values. I also had to customize > MahoutKryoRegistrator a bit with a new default serializer for this new > matrix class. This yielded a massive performance boost and I verified that > the results match exactly for several test cases and datasets. I realize > this could have side-effects in some cases, but I'm not using any other > part of Mahout, only SimilaritAnalysis.cooccurrencesIDSs(). > > Any thoughts / comments? > > > Matt > > > > On 8/16/17, 8:29 PM, "Ted Dunning" <ted.dunn...@gmail.com> wrote: > > > It is common with large numerical codes that things run faster in memory > on > > just a few cores if the communication required outweighs the parallel > > speedup. > > > > The issue is that memory bandwidth is slower than the arithmetic speed > by a > > very good amount. If you just have to move stuff into the CPU and munch > on > > it a bit it is one thing, but if you have to move the data to CPU and > back > > to memory to distributed it around possibly multiple times, you may wind > up > > with something much slower than you would have had if you were to attack > > the problem directly. > > > > > > > > On Wed, Aug 16, 2017 at 4:47 PM, Pat Ferrel <p...@occamsmachete.com> > wrote: > > > >> This uses the Mahout blas optimizing solver, which I just use and do not > >> know well. Mahout virtualizes some things having to do with partitioning > >> and I’ve never quite understood how they work. There is a .par() on one > of > >> the matrix classes that has a similar function to partition but in all > >> cases I’ve used .par(auto) and use normal spark repartitioning based on > >> parallelism. Mahout implements a mapBlock function, which (all things > being > >> equal) looks at a partition at a time in memory. The reduce is not part > of > >> the code I wrote. > >> > >> The reduce is not part of the code I wrote. Maybe someone else can > explain > >> what blas is doing. > >> > >> BTW hashmap is O(log n) on average for large n—caveats apply. We use > >> fastutils for many things (I thought this was one case) because they are > >> faster than JVM implementations but feel free to dig in further. We use > >> downsampling to maintain an overall rough O(n) calculation speed where > n = > >> # rows (users). As the model gets more dense there are greatly > diminishing > >> returns for the density so after the elements per row threshold is > reached > >> we don’t use more in the model creation math. > >> > >> Still feel free to dig into what the optimizer is doing. > >> > >> > >> On Aug 15, 2017, at 11:13 AM, Scruggs, Matt <matt.scru...@bronto.com> > >> wrote: > >> > >> Thanks Pat, that's good to know! > >> > >> This is the "reduce" step (which gets its own stage in my Spark > >> jobs...this stage takes almost all the runtime) where most of the work > is > >> being done, and takes longer the more shuffle partitions there are > >> (relative to # of CPUs): > >> > >> https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4 > >> 5863df3e53/spark/src/main/scala/org/apache/mahout/ > >> sparkbindings/blas/AtA.scala#L258 > >> > >> > >> Why does the runtime of this reduce stage (that ultimately calls > >> SequentialAccessSparseVector.getQuick() and setQuick() a lot) depend on > >> the ratio of (# Spark CPUs / spark.sql.shuffle.partitions)? Essentially > >> that ratio determines how many "chunks" of shuffle partition (reduce) > tasks > >> must run, and each of those chunks always takes the same amount of > time, so > >> the stage finishes in less time when that ratio is low (preferably 1). > >> > >> EXAMPLES - using 32 cores and 200 shuffle partitions, this stage > requires > >> ceil(145 tasks / 32 cores) = 5 "chunks" of work (145 tasks instead of > 200 > >> because of the estimateProductPartitions call in AtA). Each chunk takes > ~8 > >> minutes, so (5 chunks * 8 min) = ~40 mins. For a job with 32 cores and > 32 > >> shuffle partitions (same CPU resources, still 100% utilized), this stage > >> requires only ceil(23 tasks / 32 cores) = 1 chunk of work, which takes > the > >> same 8 minutes, so the job finishes ~5x faster. You can take this to the > >> extreme with just 1 core and 1 shuffle partition, and the stage still > takes > >> the same amount of time! I'd love to know if you can reproduce this > >> behavior. > >> > >> This goes against most advice and experience I've had with Spark, where > >> you want to *increase* your partitioning in many cases (or at least > leave > >> it at the default 200, not lower it dramatically) to utilize CPUs better > >> (and shrink each individual partition's task). There seems to be no > >> reduction in computational complexity *per task* (within this stage I'm > >> talking about) even with high values for spark.sql.shuffle.partitions > (so > >> it seems the data isn't actually being partitioned by the shuffle > process). > >> Refer back to the timings w/various configs in my first message. > >> > >> Also...is there a possibility of using a faster hash-based > implementation > >> instead of the setQuick() / getQuick() methods of > >> SequentialAccessSparseVector? The javadoc on those methods mentions they > >> shouldn't be used unless absolutely necessary due to their O(log n) > >> complexity. > >> > >> > >> Thanks for your time...this is fun stuff! > >> Matt > >> > >> > >> > >> On 8/15/17, 10:15 AM, "Pat Ferrel" <p...@occamsmachete.com> wrote: > >> > >>> Great, this is the best way to use the APIs. The big win with CCO, the > >> algo you are using is with multiple user actions. Be aware that when > you go > >> to this methods the input IndexedDatasets must be coerced to have > >> compatible dimensionality, in this case the primary action defines the > >> user-set used in calculating the model—not the one for making queries, > >> which can use anonymous user history. But that is for later and outside > >> Mahout. > >>> > >>> 1) 4x max parallelism is a rule of thumb since the cores may not need > >> 100% duty cycle, if they are already at 100% the 4x does no good. 2) you > >> have found a long running task but there will always be one, if it > weren’t > >> this one it would be another. Different types of tasks use resources > >> differently. For instance the collects, which must eventually use a the > >> memory of the Driver to instantiate an in-memory data structure. There > is > >> no magic choice to make this work differently but it avoid several > joins, > >> which are much slower. > >>> > >>> I’m not quite sure what your question is. > >>> > >>> > >>> On Aug 15, 2017, at 6:21 AM, Scruggs, Matt <matt.scru...@bronto.com> > >> wrote: > >>> > >>> Hi Pat, > >>> > >>> I've taken some screenshots of my Spark UI to hopefully shed some light > >> on the behavior I'm seeing. Do you mind if I send you a link via direct > >> email (would rather not post it here)? It's just a shared Dropbox > folder. > >>> > >>> > >>> Thanks, > >>> Matt > >>> > >>> > >>> > >>> On 8/14/17, 11:34 PM, "Scruggs, Matt" <matt.scru...@bronto.com> wrote: > >>> > >>>> I'm running a custom Scala app (distributed in a shaded jar) directly > >> calling SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI. > >>>> > >>>> The input data already gets explicitly repartitioned to > spark.cores.max > >> (defaultParallelism) in our code. I'll try increasing that by the > factor of > >> 4 that you suggest, but all our cores are already utilized so I'm not > sure > >> that will help. It gets bogged down in the post-shuffle (shuffle read / > >> combine / reduce) phase even with all cores busy the whole time, which > is > >> why I've been playing around with various values for > >> spark.sql.shuffle.partitions. The O(log n) operations I mentioned seem > to > >> take >95% of runtime. > >>>> > >>>> Thanks, > >>>> Matt > >>>> ________________________________ > >>>> From: Pat Ferrel <p...@occamsmachete.com> > >>>> Sent: Monday, August 14, 2017 11:02:42 PM > >>>> To: user@mahout.apache.org > >>>> Subject: Re: spark-itemsimilarity scalability / Spark parallelism > >> issues (SimilarityAnalysis.cooccurrencesIDSs) > >>>> > >>>> Are you using the CLI? If so it’s likely that there is only one > >> partition of the data. If you use Mahout in the Spark shell or using it > as > >> a lib, do a repartition on the input data before passing it into > >> SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to > >> start with and set max parallelism for spark to the same. The CLI isn’t > >> really production worthy, just for super easy experiments with CSVs. > >>>> > >>>> > >>>> On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <matt.scru...@bronto.com> > >> wrote: > >>>> > >>>> Howdy, > >>>> > >>>> I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small > >> dataset (about 870k [user, item] rows in the primary action IDS…no cross > >> co-occurrence IDS) and I noticed it scales strangely. This is with > Mahout > >> 0.13.0 although the same behavior happens in 0.12.x as well (haven't > tested > >> it before that). > >>>> > >>>> TLDR - regardless of the Spark parallelism (CPUs) I throw at this > >> routine, every Spark task within the final / busy stage seems to take > the > >> same amount of time, which leads me to guess that every shuffle > partition > >> contains the same amount of data (perhaps the full dataset matrix in > >> shape/size, albeit with different values). I'm reaching out to see if > this > >> is a known algorithmic complexity issue in this routine, or if my > config is > >> to blame (or both). > >>>> > >>>> Regarding our hardware, we have identical physical machines in a Mesos > >> cluster with 6 workers and a few masters. Each worker has ~500GB of > SSD, 32 > >> cores and 128g RAM. We run lots of Spark jobs and have generally ironed > out > >> the kinks in terms of hardware and cluster config, so I don't suspect > any > >> hardware-related issues. > >>>> > >>>> Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on > this > >> dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20, > >> randomSeed = default, parOpts = default (there's lots of other Spark > >> config, this is just what I'm varying to check for effects). In > particular, > >> notice how the ratio of (spark.sql.shuffle.partitions / spark.cores.max) > >> affects the runtime: > >>>> > >>>> * 8 executors w/8 cores each, takes about 45 minutes > >>>> * note that spark.sql.shuffle.partitions > spark.cores.max > >>>> spark.cores.max = 64 > >>>> spark.executor.cores = 8 > >>>> spark.sql.shuffle.partitions = 200 (default) > >>>> > >>>> * 1 executors w/24 cores, takes about 65 minutes > >>>> * note that spark.sql.shuffle.partitions >>> spark.cores.max > >>>> spark.cores.max = 24 > >>>> spark.executor.cores = 24 > >>>> spark.sql.shuffle.partitions = 200 (default) > >>>> > >>>> * 1 executor w/8 cores, takes about 8 minutes > >>>> * note that spark.sql.shuffle.partitions = spark.cores.max > >>>> spark.cores.max = 8 > >>>> spark.executor.cores = 8 (1 executor w/8 cores) > >>>> spark.sql.shuffle.partitions = 8 > >>>> > >>>> * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!) > >>>> * note that spark.sql.shuffle.partitions = spark.cores.max > >>>> spark.cores.max = 24 > >>>> spark.executor.cores = 24 (1 executor w/24 cores) > >>>> spark.sql.shuffle.partitions = 24 > >>>> > >>>> * 32 executors w/2 cores each, takes about 8 minutes (same as 8 > cores!) > >>>> * note that spark.sql.shuffle.partitions = spark.cores.max > >>>> spark.cores.max = 64 > >>>> spark.executor.cores = 2 > >>>> spark.sql.shuffle.partitions = 88 (results in 64 tasks for final > stage) > >>>> > >>>> Adjusting the "maxNumInteractions" parameter down to 100 and 50 > results > >> in a minor improvement (5-10%). I've also played around with removing > >> [user, item] rows from the input dataset for users with only 1 > >> interaction…I read to try that in another thread…that yielded maybe a > >> 40-50% speed improvement, but I'd rather not toss out data (unless it > truly > >> is totally useless, of course :D ). > >>>> > >>>> When I look at the thread dump within the Spark UI's Executors -> > >> thread dump pages, it seems all the executors are very busy in the code > >> pasted below for >95% of the run. GC throughput is very good so we're > not > >> bogged down there...it's just super busy doing running the code below. > I am > >> intrigued about the comments on the SequentialAccessSparseVector > methods I > >> see being called (getQuick and setQuick), which state they take O(log n) > >> time (https://github.com/apache/mahout/blob/ > 08e02602e947ff945b9bd73ab5f0b4 > >> 5863df3e53/math/src/main/java/org/apache/mahout/math/ > >> SequentialAccessSparseVector.java). > >>>> > >>>> > >>>> Thanks all for your time and feedback! > >>>> > >>>> Matt Scruggs > >>>> > >>>> org.apache.mahout.math.OrderedIntDoubleMapping.find( > >> OrderedIntDoubleMapping.java:105) > >>>> org.apache.mahout.math.OrderedIntDoubleMapping.get( > >> OrderedIntDoubleMapping.java:110) > >>>> org.apache.mahout.math.SequentialAccessSparseVector.getQuick( > >> SequentialAccessSparseVector.java:157) > >>>> org.apache.mahout.math.SparseRowMatrix.getQuick( > >> SparseRowMatrix.java:90) > >>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240) > >>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$ > >> eq(MatrixOps.scala:45) > >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: > >> 258) > >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: > >> 258) > >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ > >> anonfun$3.apply(ExternalAppendOnlyMap.scala:151) > >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ > >> anonfun$3.apply(ExternalAppendOnlyMap.scala:150) > >>>> org.apache.spark.util.collection.AppendOnlyMap. > >> changeValue(AppendOnlyMap.scala:144) > >>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap. > changeValue( > >> SizeTrackingAppendOnlyMap.scala:32) > >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll( > >> ExternalAppendOnlyMap.scala:163) > >>>> org.apache.spark.Aggregator.combineCombinersByKey( > Aggregator.scala:50) > >>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read( > >> BlockStoreShuffleReader.scala:85) > >>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( > >> ShuffleMapTask.scala:79) > >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( > >> ShuffleMapTask.scala:47) > >>>> org.apache.spark.scheduler.Task.run(Task.scala:86) > >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > >>>> java.util.concurrent.ThreadPoolExecutor.runWorker( > >> ThreadPoolExecutor.java:1142) > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run( > >> ThreadPoolExecutor.java:617) > >>>> java.lang.Thread.run(Thread.java:745) > >>>> > >>>> ……or this code…… > >>>> > >>>> org.apache.mahout.math.SparseRowMatrix.setQuick( > >> SparseRowMatrix.java:105) > >>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240) > >>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$ > >> eq(MatrixOps.scala:45) > >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: > >> 258) > >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: > >> 258) > >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ > >> anonfun$3.apply(ExternalAppendOnlyMap.scala:151) > >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ > >> anonfun$3.apply(ExternalAppendOnlyMap.scala:150) > >>>> org.apache.spark.util.collection.AppendOnlyMap. > >> changeValue(AppendOnlyMap.scala:144) > >>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap. > changeValue( > >> SizeTrackingAppendOnlyMap.scala:32) > >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll( > >> ExternalAppendOnlyMap.scala:163) > >>>> org.apache.spark.Aggregator.combineCombinersByKey( > Aggregator.scala:50) > >>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read( > >> BlockStoreShuffleReader.scala:85) > >>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.rdd.MapPartitionsRDD.compute( > >> MapPartitionsRDD.scala:38) > >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( > >> ShuffleMapTask.scala:79) > >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( > >> ShuffleMapTask.scala:47) > >>>> org.apache.spark.scheduler.Task.run(Task.scala:86) > >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > >>>> java.util.concurrent.ThreadPoolExecutor.runWorker( > >> ThreadPoolExecutor.java:1142) > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run( > >> ThreadPoolExecutor.java:617) > >>>> java.lang.Thread.run(Thread.java:745) > >>>> > >>> > >> > >> > > > >