Hi Pat, I've taken some screenshots of my Spark UI to hopefully shed some light on the behavior I'm seeing. Do you mind if I send you a link via direct email (would rather not post it here)? It's just a shared Dropbox folder.
Thanks, Matt On 8/14/17, 11:34 PM, "Scruggs, Matt" <matt.scru...@bronto.com> wrote: >I'm running a custom Scala app (distributed in a shaded jar) directly calling >SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI. > >The input data already gets explicitly repartitioned to spark.cores.max >(defaultParallelism) in our code. I'll try increasing that by the factor of 4 >that you suggest, but all our cores are already utilized so I'm not sure that >will help. It gets bogged down in the post-shuffle (shuffle read / combine / >reduce) phase even with all cores busy the whole time, which is why I've been >playing around with various values for spark.sql.shuffle.partitions. The O(log >n) operations I mentioned seem to take >95% of runtime. > >Thanks, >Matt >________________________________ >From: Pat Ferrel <p...@occamsmachete.com> >Sent: Monday, August 14, 2017 11:02:42 PM >To: user@mahout.apache.org >Subject: Re: spark-itemsimilarity scalability / Spark parallelism issues >(SimilarityAnalysis.cooccurrencesIDSs) > >Are you using the CLI? If so it’s likely that there is only one partition of >the data. If you use Mahout in the Spark shell or using it as a lib, do a >repartition on the input data before passing it into >SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to start >with and set max parallelism for spark to the same. The CLI isn’t really >production worthy, just for super easy experiments with CSVs. > > >On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <matt.scru...@bronto.com> wrote: > >Howdy, > >I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small dataset >(about 870k [user, item] rows in the primary action IDS…no cross co-occurrence >IDS) and I noticed it scales strangely. This is with Mahout 0.13.0 although >the same behavior happens in 0.12.x as well (haven't tested it before that). > >TLDR - regardless of the Spark parallelism (CPUs) I throw at this routine, >every Spark task within the final / busy stage seems to take the same amount >of time, which leads me to guess that every shuffle partition contains the >same amount of data (perhaps the full dataset matrix in shape/size, albeit >with different values). I'm reaching out to see if this is a known algorithmic >complexity issue in this routine, or if my config is to blame (or both). > >Regarding our hardware, we have identical physical machines in a Mesos cluster >with 6 workers and a few masters. Each worker has ~500GB of SSD, 32 cores and >128g RAM. We run lots of Spark jobs and have generally ironed out the kinks in >terms of hardware and cluster config, so I don't suspect any hardware-related >issues. > >Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this dataset >with maxNumInteractions = 500, maxInterestingItemsPerThing = 20, randomSeed = >default, parOpts = default (there's lots of other Spark config, this is just >what I'm varying to check for effects). In particular, notice how the ratio of >(spark.sql.shuffle.partitions / spark.cores.max) affects the runtime: > > * 8 executors w/8 cores each, takes about 45 minutes > * note that spark.sql.shuffle.partitions > spark.cores.max > spark.cores.max = 64 > spark.executor.cores = 8 > spark.sql.shuffle.partitions = 200 (default) > > * 1 executors w/24 cores, takes about 65 minutes > * note that spark.sql.shuffle.partitions >>> spark.cores.max > spark.cores.max = 24 > spark.executor.cores = 24 > spark.sql.shuffle.partitions = 200 (default) > > * 1 executor w/8 cores, takes about 8 minutes > * note that spark.sql.shuffle.partitions = spark.cores.max > spark.cores.max = 8 > spark.executor.cores = 8 (1 executor w/8 cores) > spark.sql.shuffle.partitions = 8 > > * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!) > * note that spark.sql.shuffle.partitions = spark.cores.max > spark.cores.max = 24 > spark.executor.cores = 24 (1 executor w/24 cores) > spark.sql.shuffle.partitions = 24 > > * 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!) > * note that spark.sql.shuffle.partitions = spark.cores.max > spark.cores.max = 64 > spark.executor.cores = 2 > spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage) > >Adjusting the "maxNumInteractions" parameter down to 100 and 50 results in a >minor improvement (5-10%). I've also played around with removing [user, item] >rows from the input dataset for users with only 1 interaction…I read to try >that in another thread…that yielded maybe a 40-50% speed improvement, but I'd >rather not toss out data (unless it truly is totally useless, of course :D ). > >When I look at the thread dump within the Spark UI's Executors -> thread dump >pages, it seems all the executors are very busy in the code pasted below for >>95% of the run. GC throughput is very good so we're not bogged down >there...it's just super busy doing running the code below. I am intrigued >about the comments on the SequentialAccessSparseVector methods I see being >called (getQuick and setQuick), which state they take O(log n) time >(https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/math/src/main/java/org/apache/mahout/math/SequentialAccessSparseVector.java). > > >Thanks all for your time and feedback! > >Matt Scruggs > >org.apache.mahout.math.OrderedIntDoubleMapping.find(OrderedIntDoubleMapping.java:105) >org.apache.mahout.math.OrderedIntDoubleMapping.get(OrderedIntDoubleMapping.java:110) >org.apache.mahout.math.SequentialAccessSparseVector.getQuick(SequentialAccessSparseVector.java:157) >org.apache.mahout.math.SparseRowMatrix.getQuick(SparseRowMatrix.java:90) >org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240) >org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45) >org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258) >org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258) >org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151) >org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150) >org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144) >org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) >org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163) >org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) >org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85) >org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) >org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) >org.apache.spark.scheduler.Task.run(Task.scala:86) >org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >java.lang.Thread.run(Thread.java:745) > >……or this code…… > >org.apache.mahout.math.SparseRowMatrix.setQuick(SparseRowMatrix.java:105) >org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240) >org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45) >org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258) >org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258) >org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151) >org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150) >org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144) >org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) >org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163) >org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) >org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85) >org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) >org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) >org.apache.spark.scheduler.Task.run(Task.scala:86) >org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >java.lang.Thread.run(Thread.java:745) >