Good question :D

For the dataset I mentioned in my first message, the entire run is almost 10x 
faster (I expect that speedup to be non-linear since it nearly eliminates a for 
loop...bigger gains for bigger datasets). It's possible there are other 
sections of the code I can't override (e.g. before serialization of the 
matrices) that could take advantage of the sparse matrix / vector APIs.

I didn't make these changes in Mahout itself. I created a custom Kryo 
deserializer within my app to deserialize SparseRowMatrix instances to a custom 
subclass of SparseRowMatrix; this new class overrides the AbstractMatrix 
implementation of apply(matrix, function). FWIW, all of my app's existing tests 
pass and it produces the same results as before these changes.


On 8/21/17, 1:53 PM, "Pat Ferrel" <> wrote:

>Interesting indeed. What is “massive”? Does the change pass all unit tests?
>On Aug 17, 2017, at 1:04 PM, Scruggs, Matt <> wrote:
>Thanks for the remarks guys!
>I profiled the code running locally on my machine and discovered this loop is 
>where these setQuick() and getQuick() calls originate (during matrix Kryo 
>deserialization), and as you can see the complexity of this 2D loop can be 
>very high:
>Recall that this algorithm uses SparseRowMatrix whose rows are 
>SequentialAccessSparseVector, so all this looping seems unnecessary. I created 
>a new subclass of SparseRowMatrix that overrides that assign(matrix, function) 
>method, and instead of looping through all the columns of each row, it calls 
>SequentialAccessSparseVector.iterateNonZero() so it only has to touch the 
>cells with values. I also had to customize MahoutKryoRegistrator a bit with a 
>new default serializer for this new matrix class. This yielded a massive 
>performance boost and I verified that the results match exactly for several 
>test cases and datasets. I realize this could have side-effects in some cases, 
>but I'm not using any other part of Mahout, only 
>Any thoughts / comments?
>On 8/16/17, 8:29 PM, "Ted Dunning" <> wrote:
>> It is common with large numerical codes that things run faster in memory on
>> just a few cores if the communication required outweighs the parallel
>> speedup.
>> The issue is that memory bandwidth is slower than the arithmetic speed by a
>> very good amount. If you just have to move stuff into the CPU and munch on
>> it a bit it is one thing, but if you have to move the data to CPU and back
>> to memory to distributed it around possibly multiple times, you may wind up
>> with something much slower than you would have had if you were to attack
>> the problem directly.
>> On Wed, Aug 16, 2017 at 4:47 PM, Pat Ferrel <> wrote:
>>> This uses the Mahout blas optimizing solver, which I just use and do not
>>> know well. Mahout virtualizes some things having to do with partitioning
>>> and I’ve never quite understood how they work. There is a .par() on one of
>>> the matrix classes that has a similar function to partition but in all
>>> cases I’ve used .par(auto) and use normal spark repartitioning based on
>>> parallelism. Mahout implements a mapBlock function, which (all things being
>>> equal) looks at a partition at a time in memory. The reduce is not part of
>>> the code I wrote.
>>> The reduce is not part of the code I wrote. Maybe someone else can explain
>>> what blas is doing.
>>> BTW hashmap is O(log n) on average for large n—caveats apply. We use
>>> fastutils for many things (I thought this was one case) because they are
>>> faster than JVM implementations but feel free to dig in further. We use
>>> downsampling to maintain an overall rough O(n) calculation speed where n =
>>> # rows (users). As the model gets more dense there are greatly diminishing
>>> returns for the density so after the elements per row threshold is reached
>>> we don’t use more in the model creation math.
>>> Still feel free to dig into what the optimizer is doing.
>>> On Aug 15, 2017, at 11:13 AM, Scruggs, Matt <>
>>> wrote:
>>> Thanks Pat, that's good to know!
>>> This is the "reduce" step (which gets its own stage in my Spark
>>> jobs...this stage takes almost all the runtime) where most of the work is
>>> being done, and takes longer the more shuffle partitions there are
>>> (relative to # of CPUs):
>>> 5863df3e53/spark/src/main/scala/org/apache/mahout/
>>> sparkbindings/blas/AtA.scala#L258
>>> Why does the runtime of this reduce stage (that ultimately calls
>>> SequentialAccessSparseVector.getQuick() and setQuick() a lot) depend on
>>> the ratio of (# Spark CPUs / spark.sql.shuffle.partitions)? Essentially
>>> that ratio determines how many "chunks" of shuffle partition (reduce) tasks
>>> must run, and each of those chunks always takes the same amount of time, so
>>> the stage finishes in less time when that ratio is low (preferably 1).
>>> EXAMPLES - using 32 cores and 200 shuffle partitions, this stage requires
>>> ceil(145 tasks / 32 cores) = 5 "chunks" of work (145 tasks instead of 200
>>> because of the estimateProductPartitions call in AtA). Each chunk takes ~8
>>> minutes, so (5 chunks * 8 min) = ~40 mins. For a job with 32 cores and 32
>>> shuffle partitions (same CPU resources, still 100% utilized), this stage
>>> requires only ceil(23 tasks / 32 cores) = 1 chunk of work, which takes the
>>> same 8 minutes, so the job finishes ~5x faster. You can take this to the
>>> extreme with just 1 core and 1 shuffle partition, and the stage still takes
>>> the same amount of time! I'd love to know if you can reproduce this
>>> behavior.
>>> This goes against most advice and experience I've had with Spark, where
>>> you want to *increase* your partitioning in many cases (or at least leave
>>> it at the default 200, not lower it dramatically) to utilize CPUs better
>>> (and shrink each individual partition's task). There seems to be no
>>> reduction in computational complexity *per task* (within this stage I'm
>>> talking about) even with high values for spark.sql.shuffle.partitions (so
>>> it seems the data isn't actually being partitioned by the shuffle process).
>>> Refer back to the timings w/various configs in my first message.
>>> there a possibility of using a faster hash-based implementation
>>> instead of the setQuick() / getQuick() methods of
>>> SequentialAccessSparseVector? The javadoc on those methods mentions they
>>> shouldn't be used unless absolutely necessary due to their O(log n)
>>> complexity.
>>> Thanks for your time...this is fun stuff!
>>> Matt
>>> On 8/15/17, 10:15 AM, "Pat Ferrel" <> wrote:
>>>> Great, this is the best way to use the APIs. The big win with CCO, the
>>> algo you are using is with multiple user actions. Be aware that when you go
>>> to this methods the input IndexedDatasets must be coerced to have
>>> compatible dimensionality, in this case the primary action defines the
>>> user-set used in calculating the model—not the one for making queries,
>>> which can use anonymous user  history. But that is for later and outside
>>> Mahout.
>>>> 1) 4x max parallelism is a rule of thumb since the cores may not need
>>> 100% duty cycle, if they are already at 100% the 4x does no good. 2) you
>>> have found a long running task but there will always be one, if it weren’t
>>> this one it would be another. Different types of tasks use resources
>>> differently. For instance the collects, which must eventually use a the
>>> memory of the Driver to instantiate an in-memory data structure. There is
>>> no magic choice to make this work differently but it avoid several joins,
>>> which are much slower.
>>>> I’m not quite sure what your question is.
>>>> On Aug 15, 2017, at 6:21 AM, Scruggs, Matt <>
>>> wrote:
>>>> Hi Pat,
>>>> I've taken some screenshots of my Spark UI to hopefully shed some light
>>> on the behavior I'm seeing. Do you mind if I send you a link via direct
>>> email (would rather not post it here)? It's just a shared Dropbox folder.
>>>> Thanks,
>>>> Matt
>>>> On 8/14/17, 11:34 PM, "Scruggs, Matt" <> wrote:
>>>>> I'm running a custom Scala app (distributed in a shaded jar) directly
>>> calling SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI.
>>>>> The input data already gets explicitly repartitioned to spark.cores.max
>>> (defaultParallelism) in our code. I'll try increasing that by the factor of
>>> 4 that you suggest, but all our cores are already utilized so I'm not sure
>>> that will help. It gets bogged down in the post-shuffle (shuffle read /
>>> combine / reduce) phase even with all cores busy the whole time, which is
>>> why I've been playing around with various values for
>>> spark.sql.shuffle.partitions. The O(log n) operations I mentioned seem to
>>> take >95% of runtime.
>>>>> Thanks,
>>>>> Matt
>>>>> ________________________________
>>>>> From: Pat Ferrel <>
>>>>> Sent: Monday, August 14, 2017 11:02:42 PM
>>>>> To:
>>>>> Subject: Re: spark-itemsimilarity scalability / Spark parallelism
>>> issues (SimilarityAnalysis.cooccurrencesIDSs)
>>>>> Are you using the CLI? If so it’s likely that there is only one
>>> partition of the data. If you use Mahout in the Spark shell or using it as
>>> a lib, do a repartition on the input data before passing it into
>>> SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to
>>> start with and set max parallelism for spark to the same. The CLI isn’t
>>> really production worthy, just for super easy experiments with CSVs.
>>>>> On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <>
>>> wrote:
>>>>> Howdy,
>>>>> I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small
>>> dataset (about 870k [user, item] rows in the primary action IDS…no cross
>>> co-occurrence IDS) and I noticed it scales strangely. This is with Mahout
>>> 0.13.0 although the same behavior happens in 0.12.x as well (haven't tested
>>> it before that).
>>>>> TLDR - regardless of the Spark parallelism (CPUs) I throw at this
>>> routine, every Spark task within the final / busy stage seems to take the
>>> same amount of time, which leads me to guess that every shuffle partition
>>> contains the same amount of data (perhaps the full dataset matrix in
>>> shape/size, albeit with different values). I'm reaching out to see if this
>>> is a known algorithmic complexity issue in this routine, or if my config is
>>> to blame (or both).
>>>>> Regarding our hardware, we have identical physical machines in a Mesos
>>> cluster with 6 workers and a few masters. Each worker has ~500GB of SSD, 32
>>> cores and 128g RAM. We run lots of Spark jobs and have generally ironed out
>>> the kinks in terms of hardware and cluster config, so I don't suspect any
>>> hardware-related issues.
>>>>> Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this
>>> dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20,
>>> randomSeed = default, parOpts = default (there's lots of other Spark
>>> config, this is just what I'm varying to check for effects). In particular,
>>> notice how the ratio of (spark.sql.shuffle.partitions / spark.cores.max)
>>> affects the runtime:
>>>>> * 8 executors w/8 cores each, takes about 45 minutes
>>>>> * note that spark.sql.shuffle.partitions > spark.cores.max
>>>>> spark.cores.max = 64
>>>>> spark.executor.cores = 8
>>>>> spark.sql.shuffle.partitions = 200 (default)
>>>>> * 1 executors w/24 cores, takes about 65 minutes
>>>>> * note that spark.sql.shuffle.partitions >>> spark.cores.max
>>>>> spark.cores.max = 24
>>>>> spark.executor.cores = 24
>>>>> spark.sql.shuffle.partitions = 200 (default)
>>>>> * 1 executor w/8 cores, takes about 8 minutes
>>>>> * note that spark.sql.shuffle.partitions = spark.cores.max
>>>>> spark.cores.max = 8
>>>>> spark.executor.cores = 8 (1 executor w/8 cores)
>>>>> spark.sql.shuffle.partitions = 8
>>>>> * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!)
>>>>> * note that spark.sql.shuffle.partitions = spark.cores.max
>>>>> spark.cores.max = 24
>>>>> spark.executor.cores = 24 (1 executor w/24 cores)
>>>>> spark.sql.shuffle.partitions = 24
>>>>> * 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!)
>>>>> * note that spark.sql.shuffle.partitions = spark.cores.max
>>>>> spark.cores.max = 64
>>>>> spark.executor.cores = 2
>>>>> spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage)
>>>>> Adjusting the "maxNumInteractions" parameter down to 100 and 50 results
>>> in a minor improvement (5-10%). I've also played around with removing
>>> [user, item] rows from the input dataset for users with only 1
>>> interaction…I read to try that in another thread…that yielded maybe a
>>> 40-50% speed improvement, but I'd rather not toss out data (unless it truly
>>> is totally useless, of course :D ).
>>>>> When I look at the thread dump within the Spark UI's Executors ->
>>> thread dump pages, it seems all the executors are very busy in the code
>>> pasted below for >95% of the run. GC throughput is very good so we're not
>>> bogged down's just super busy doing running the code below. I am
>>> intrigued about the comments on the SequentialAccessSparseVector methods I
>>> see being called (getQuick and setQuick), which state they take O(log n)
>>> time (
>>> 5863df3e53/math/src/main/java/org/apache/mahout/math/
>>>>> Thanks all for your time and feedback!
>>>>> Matt Scruggs
>>>>> org.apache.mahout.math.OrderedIntDoubleMapping.find(
>>>>> org.apache.mahout.math.OrderedIntDoubleMapping.get(
>>>>> org.apache.mahout.math.SequentialAccessSparseVector.getQuick(
>>>>> org.apache.mahout.math.SparseRowMatrix.getQuick(
>>>>> org.apache.mahout.math.AbstractMatrix.assign(
>>>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$
>>> eq(MatrixOps.scala:45)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>>>>> org.apache.spark.util.collection.AppendOnlyMap.
>>> changeValue(AppendOnlyMap.scala:144)
>>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(
>>> SizeTrackingAppendOnlyMap.scala:32)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(
>>> ExternalAppendOnlyMap.scala:163)
>>>>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>>> BlockStoreShuffleReader.scala:85)
>>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:79)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:47)
>>>>> org.apache.spark.executor.Executor$
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> java.util.concurrent.ThreadPoolExecutor$
>>>>> ……or this code……
>>>>> org.apache.mahout.math.SparseRowMatrix.setQuick(
>>>>> org.apache.mahout.math.AbstractMatrix.assign(
>>>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$
>>> eq(MatrixOps.scala:45)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>>>>> org.apache.spark.util.collection.AppendOnlyMap.
>>> changeValue(AppendOnlyMap.scala:144)
>>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(
>>> SizeTrackingAppendOnlyMap.scala:32)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(
>>> ExternalAppendOnlyMap.scala:163)
>>>>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>>> BlockStoreShuffleReader.scala:85)
>>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:79)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:47)
>>>>> org.apache.spark.executor.Executor$
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> java.util.concurrent.ThreadPoolExecutor$

Reply via email to