Hi Pat,

I've taken some screenshots of my Spark UI to hopefully shed some light on the 
behavior I'm seeing. Do you mind if I send you a link via direct email (would 
rather not post it here)? It's just a shared Dropbox folder.


Thanks,
Matt



On 8/14/17, 11:34 PM, "Scruggs, Matt" <matt.scru...@bronto.com> wrote:

>I'm running a custom Scala app (distributed in a shaded jar) directly calling 
>SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI.
>
>The input data already gets explicitly repartitioned to spark.cores.max 
>(defaultParallelism) in our code. I'll try increasing that by the factor of 4 
>that you suggest, but all our cores are already utilized so I'm not sure that 
>will help. It gets bogged down in the post-shuffle (shuffle read / combine / 
>reduce) phase even with all cores busy the whole time, which is why I've been 
>playing around with various values for spark.sql.shuffle.partitions. The O(log 
>n) operations I mentioned seem to take >95% of runtime.
>
>Thanks,
>Matt
>________________________________
>From: Pat Ferrel <p...@occamsmachete.com>
>Sent: Monday, August 14, 2017 11:02:42 PM
>To: user@mahout.apache.org
>Subject: Re: spark-itemsimilarity scalability / Spark parallelism issues 
>(SimilarityAnalysis.cooccurrencesIDSs)
>
>Are you using the CLI? If so it’s likely that there is only one partition of 
>the data. If you use Mahout in the Spark shell or using it as a lib, do a 
>repartition on the input data before passing it into 
>SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to start 
>with and set max parallelism for spark to the same. The CLI isn’t really 
>production worthy, just for super easy experiments with CSVs.
>
>
>On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <matt.scru...@bronto.com> wrote:
>
>Howdy,
>
>I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small dataset 
>(about 870k [user, item] rows in the primary action IDS…no cross co-occurrence 
>IDS) and I noticed it scales strangely. This is with Mahout 0.13.0 although 
>the same behavior happens in 0.12.x as well (haven't tested it before that).
>
>TLDR - regardless of the Spark parallelism (CPUs) I throw at this routine, 
>every Spark task within the final / busy stage seems to take the same amount 
>of time, which leads me to guess that every shuffle partition contains the 
>same amount of data (perhaps the full dataset matrix in shape/size, albeit 
>with different values). I'm reaching out to see if this is a known algorithmic 
>complexity issue in this routine, or if my config is to blame (or both).
>
>Regarding our hardware, we have identical physical machines in a Mesos cluster 
>with 6 workers and a few masters. Each worker has ~500GB of SSD, 32 cores and 
>128g RAM. We run lots of Spark jobs and have generally ironed out the kinks in 
>terms of hardware and cluster config, so I don't suspect any hardware-related 
>issues.
>
>Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this dataset 
>with maxNumInteractions = 500, maxInterestingItemsPerThing = 20, randomSeed = 
>default, parOpts = default (there's lots of other Spark config, this is just 
>what I'm varying to check for effects). In particular, notice how the ratio of 
>(spark.sql.shuffle.partitions / spark.cores.max) affects the runtime:
>
> * 8 executors w/8 cores each, takes about 45 minutes
> * note that spark.sql.shuffle.partitions > spark.cores.max
> spark.cores.max = 64
> spark.executor.cores = 8
> spark.sql.shuffle.partitions = 200 (default)
>
> * 1 executors w/24 cores, takes about 65 minutes
> * note that spark.sql.shuffle.partitions >>> spark.cores.max
> spark.cores.max = 24
> spark.executor.cores = 24
> spark.sql.shuffle.partitions = 200 (default)
>
> * 1 executor w/8 cores, takes about 8 minutes
> * note that spark.sql.shuffle.partitions = spark.cores.max
> spark.cores.max = 8
> spark.executor.cores = 8 (1 executor w/8 cores)
> spark.sql.shuffle.partitions = 8
>
> * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!)
> * note that spark.sql.shuffle.partitions = spark.cores.max
> spark.cores.max = 24
> spark.executor.cores = 24 (1 executor w/24 cores)
> spark.sql.shuffle.partitions = 24
>
> * 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!)
> * note that spark.sql.shuffle.partitions = spark.cores.max
> spark.cores.max = 64
> spark.executor.cores = 2
> spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage)
>
>Adjusting the "maxNumInteractions" parameter down to 100 and 50 results in a 
>minor improvement (5-10%). I've also played around with removing [user, item] 
>rows from the input dataset for users with only 1 interaction…I read to try 
>that in another thread…that yielded maybe a 40-50% speed improvement, but I'd 
>rather not toss out data (unless it truly is totally useless, of course :D ).
>
>When I look at the thread dump within the Spark UI's Executors -> thread dump 
>pages, it seems all the executors are very busy in the code pasted below for 
>>95% of the run. GC throughput is very good so we're not bogged down 
>there...it's just super busy doing running the code below. I am intrigued 
>about the comments on the SequentialAccessSparseVector methods I see being 
>called (getQuick and setQuick), which state they take O(log n) time 
>(https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/math/src/main/java/org/apache/mahout/math/SequentialAccessSparseVector.java).
>
>
>Thanks all for your time and feedback!
>
>Matt Scruggs
>
>org.apache.mahout.math.OrderedIntDoubleMapping.find(OrderedIntDoubleMapping.java:105)
>org.apache.mahout.math.OrderedIntDoubleMapping.get(OrderedIntDoubleMapping.java:110)
>org.apache.mahout.math.SequentialAccessSparseVector.getQuick(SequentialAccessSparseVector.java:157)
>org.apache.mahout.math.SparseRowMatrix.getQuick(SparseRowMatrix.java:90)
>org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
>org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
>org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
>org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
>org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
>org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>org.apache.spark.scheduler.Task.run(Task.scala:86)
>org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>java.lang.Thread.run(Thread.java:745)
>
>……or this code……
>
>org.apache.mahout.math.SparseRowMatrix.setQuick(SparseRowMatrix.java:105)
>org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
>org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
>org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
>org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
>org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
>org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>org.apache.spark.scheduler.Task.run(Task.scala:86)
>org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>java.lang.Thread.run(Thread.java:745)
>

Reply via email to