Good question :D For the dataset I mentioned in my first message, the entire run is almost 10x faster (I expect that speedup to be non-linear since it nearly eliminates a for loop...bigger gains for bigger datasets). It's possible there are other sections of the code I can't override (e.g. before serialization of the matrices) that could take advantage of the sparse matrix / vector APIs.
I didn't make these changes in Mahout itself. I created a custom Kryo deserializer within my app to deserialize SparseRowMatrix instances to a custom subclass of SparseRowMatrix; this new class overrides the AbstractMatrix implementation of apply(matrix, function). FWIW, all of my app's existing tests pass and it produces the same results as before these changes. Matt On 8/21/17, 1:53 PM, "Pat Ferrel" <p...@occamsmachete.com> wrote: >Interesting indeed. What is “massive”? Does the change pass all unit tests? > > >On Aug 17, 2017, at 1:04 PM, Scruggs, Matt <matt.scru...@bronto.com> wrote: > >Thanks for the remarks guys! > >I profiled the code running locally on my machine and discovered this loop is >where these setQuick() and getQuick() calls originate (during matrix Kryo >deserialization), and as you can see the complexity of this 2D loop can be >very high: > >https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/math/src/main/java/org/apache/mahout/math/AbstractMatrix.java#L240 > > >Recall that this algorithm uses SparseRowMatrix whose rows are >SequentialAccessSparseVector, so all this looping seems unnecessary. I created >a new subclass of SparseRowMatrix that overrides that assign(matrix, function) >method, and instead of looping through all the columns of each row, it calls >SequentialAccessSparseVector.iterateNonZero() so it only has to touch the >cells with values. I also had to customize MahoutKryoRegistrator a bit with a >new default serializer for this new matrix class. This yielded a massive >performance boost and I verified that the results match exactly for several >test cases and datasets. I realize this could have side-effects in some cases, >but I'm not using any other part of Mahout, only >SimilaritAnalysis.cooccurrencesIDSs(). > >Any thoughts / comments? > > >Matt > > > >On 8/16/17, 8:29 PM, "Ted Dunning" <ted.dunn...@gmail.com> wrote: > >> It is common with large numerical codes that things run faster in memory on >> just a few cores if the communication required outweighs the parallel >> speedup. >> >> The issue is that memory bandwidth is slower than the arithmetic speed by a >> very good amount. If you just have to move stuff into the CPU and munch on >> it a bit it is one thing, but if you have to move the data to CPU and back >> to memory to distributed it around possibly multiple times, you may wind up >> with something much slower than you would have had if you were to attack >> the problem directly. >> >> >> >> On Wed, Aug 16, 2017 at 4:47 PM, Pat Ferrel <p...@occamsmachete.com> wrote: >> >>> This uses the Mahout blas optimizing solver, which I just use and do not >>> know well. Mahout virtualizes some things having to do with partitioning >>> and I’ve never quite understood how they work. There is a .par() on one of >>> the matrix classes that has a similar function to partition but in all >>> cases I’ve used .par(auto) and use normal spark repartitioning based on >>> parallelism. Mahout implements a mapBlock function, which (all things being >>> equal) looks at a partition at a time in memory. The reduce is not part of >>> the code I wrote. >>> >>> The reduce is not part of the code I wrote. Maybe someone else can explain >>> what blas is doing. >>> >>> BTW hashmap is O(log n) on average for large n—caveats apply. We use >>> fastutils for many things (I thought this was one case) because they are >>> faster than JVM implementations but feel free to dig in further. We use >>> downsampling to maintain an overall rough O(n) calculation speed where n = >>> # rows (users). As the model gets more dense there are greatly diminishing >>> returns for the density so after the elements per row threshold is reached >>> we don’t use more in the model creation math. >>> >>> Still feel free to dig into what the optimizer is doing. >>> >>> >>> On Aug 15, 2017, at 11:13 AM, Scruggs, Matt <matt.scru...@bronto.com> >>> wrote: >>> >>> Thanks Pat, that's good to know! >>> >>> This is the "reduce" step (which gets its own stage in my Spark >>> jobs...this stage takes almost all the runtime) where most of the work is >>> being done, and takes longer the more shuffle partitions there are >>> (relative to # of CPUs): >>> >>> https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4 >>> 5863df3e53/spark/src/main/scala/org/apache/mahout/ >>> sparkbindings/blas/AtA.scala#L258 >>> >>> >>> Why does the runtime of this reduce stage (that ultimately calls >>> SequentialAccessSparseVector.getQuick() and setQuick() a lot) depend on >>> the ratio of (# Spark CPUs / spark.sql.shuffle.partitions)? Essentially >>> that ratio determines how many "chunks" of shuffle partition (reduce) tasks >>> must run, and each of those chunks always takes the same amount of time, so >>> the stage finishes in less time when that ratio is low (preferably 1). >>> >>> EXAMPLES - using 32 cores and 200 shuffle partitions, this stage requires >>> ceil(145 tasks / 32 cores) = 5 "chunks" of work (145 tasks instead of 200 >>> because of the estimateProductPartitions call in AtA). Each chunk takes ~8 >>> minutes, so (5 chunks * 8 min) = ~40 mins. For a job with 32 cores and 32 >>> shuffle partitions (same CPU resources, still 100% utilized), this stage >>> requires only ceil(23 tasks / 32 cores) = 1 chunk of work, which takes the >>> same 8 minutes, so the job finishes ~5x faster. You can take this to the >>> extreme with just 1 core and 1 shuffle partition, and the stage still takes >>> the same amount of time! I'd love to know if you can reproduce this >>> behavior. >>> >>> This goes against most advice and experience I've had with Spark, where >>> you want to *increase* your partitioning in many cases (or at least leave >>> it at the default 200, not lower it dramatically) to utilize CPUs better >>> (and shrink each individual partition's task). There seems to be no >>> reduction in computational complexity *per task* (within this stage I'm >>> talking about) even with high values for spark.sql.shuffle.partitions (so >>> it seems the data isn't actually being partitioned by the shuffle process). >>> Refer back to the timings w/various configs in my first message. >>> >>> Also...is there a possibility of using a faster hash-based implementation >>> instead of the setQuick() / getQuick() methods of >>> SequentialAccessSparseVector? The javadoc on those methods mentions they >>> shouldn't be used unless absolutely necessary due to their O(log n) >>> complexity. >>> >>> >>> Thanks for your time...this is fun stuff! >>> Matt >>> >>> >>> >>> On 8/15/17, 10:15 AM, "Pat Ferrel" <p...@occamsmachete.com> wrote: >>> >>>> Great, this is the best way to use the APIs. The big win with CCO, the >>> algo you are using is with multiple user actions. Be aware that when you go >>> to this methods the input IndexedDatasets must be coerced to have >>> compatible dimensionality, in this case the primary action defines the >>> user-set used in calculating the model—not the one for making queries, >>> which can use anonymous user history. But that is for later and outside >>> Mahout. >>>> >>>> 1) 4x max parallelism is a rule of thumb since the cores may not need >>> 100% duty cycle, if they are already at 100% the 4x does no good. 2) you >>> have found a long running task but there will always be one, if it weren’t >>> this one it would be another. Different types of tasks use resources >>> differently. For instance the collects, which must eventually use a the >>> memory of the Driver to instantiate an in-memory data structure. There is >>> no magic choice to make this work differently but it avoid several joins, >>> which are much slower. >>>> >>>> I’m not quite sure what your question is. >>>> >>>> >>>> On Aug 15, 2017, at 6:21 AM, Scruggs, Matt <matt.scru...@bronto.com> >>> wrote: >>>> >>>> Hi Pat, >>>> >>>> I've taken some screenshots of my Spark UI to hopefully shed some light >>> on the behavior I'm seeing. Do you mind if I send you a link via direct >>> email (would rather not post it here)? It's just a shared Dropbox folder. >>>> >>>> >>>> Thanks, >>>> Matt >>>> >>>> >>>> >>>> On 8/14/17, 11:34 PM, "Scruggs, Matt" <matt.scru...@bronto.com> wrote: >>>> >>>>> I'm running a custom Scala app (distributed in a shaded jar) directly >>> calling SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI. >>>>> >>>>> The input data already gets explicitly repartitioned to spark.cores.max >>> (defaultParallelism) in our code. I'll try increasing that by the factor of >>> 4 that you suggest, but all our cores are already utilized so I'm not sure >>> that will help. It gets bogged down in the post-shuffle (shuffle read / >>> combine / reduce) phase even with all cores busy the whole time, which is >>> why I've been playing around with various values for >>> spark.sql.shuffle.partitions. The O(log n) operations I mentioned seem to >>> take >95% of runtime. >>>>> >>>>> Thanks, >>>>> Matt >>>>> ________________________________ >>>>> From: Pat Ferrel <p...@occamsmachete.com> >>>>> Sent: Monday, August 14, 2017 11:02:42 PM >>>>> To: user@mahout.apache.org >>>>> Subject: Re: spark-itemsimilarity scalability / Spark parallelism >>> issues (SimilarityAnalysis.cooccurrencesIDSs) >>>>> >>>>> Are you using the CLI? If so it’s likely that there is only one >>> partition of the data. If you use Mahout in the Spark shell or using it as >>> a lib, do a repartition on the input data before passing it into >>> SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to >>> start with and set max parallelism for spark to the same. The CLI isn’t >>> really production worthy, just for super easy experiments with CSVs. >>>>> >>>>> >>>>> On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <matt.scru...@bronto.com> >>> wrote: >>>>> >>>>> Howdy, >>>>> >>>>> I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small >>> dataset (about 870k [user, item] rows in the primary action IDS…no cross >>> co-occurrence IDS) and I noticed it scales strangely. This is with Mahout >>> 0.13.0 although the same behavior happens in 0.12.x as well (haven't tested >>> it before that). >>>>> >>>>> TLDR - regardless of the Spark parallelism (CPUs) I throw at this >>> routine, every Spark task within the final / busy stage seems to take the >>> same amount of time, which leads me to guess that every shuffle partition >>> contains the same amount of data (perhaps the full dataset matrix in >>> shape/size, albeit with different values). I'm reaching out to see if this >>> is a known algorithmic complexity issue in this routine, or if my config is >>> to blame (or both). >>>>> >>>>> Regarding our hardware, we have identical physical machines in a Mesos >>> cluster with 6 workers and a few masters. Each worker has ~500GB of SSD, 32 >>> cores and 128g RAM. We run lots of Spark jobs and have generally ironed out >>> the kinks in terms of hardware and cluster config, so I don't suspect any >>> hardware-related issues. >>>>> >>>>> Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this >>> dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20, >>> randomSeed = default, parOpts = default (there's lots of other Spark >>> config, this is just what I'm varying to check for effects). In particular, >>> notice how the ratio of (spark.sql.shuffle.partitions / spark.cores.max) >>> affects the runtime: >>>>> >>>>> * 8 executors w/8 cores each, takes about 45 minutes >>>>> * note that spark.sql.shuffle.partitions > spark.cores.max >>>>> spark.cores.max = 64 >>>>> spark.executor.cores = 8 >>>>> spark.sql.shuffle.partitions = 200 (default) >>>>> >>>>> * 1 executors w/24 cores, takes about 65 minutes >>>>> * note that spark.sql.shuffle.partitions >>> spark.cores.max >>>>> spark.cores.max = 24 >>>>> spark.executor.cores = 24 >>>>> spark.sql.shuffle.partitions = 200 (default) >>>>> >>>>> * 1 executor w/8 cores, takes about 8 minutes >>>>> * note that spark.sql.shuffle.partitions = spark.cores.max >>>>> spark.cores.max = 8 >>>>> spark.executor.cores = 8 (1 executor w/8 cores) >>>>> spark.sql.shuffle.partitions = 8 >>>>> >>>>> * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!) >>>>> * note that spark.sql.shuffle.partitions = spark.cores.max >>>>> spark.cores.max = 24 >>>>> spark.executor.cores = 24 (1 executor w/24 cores) >>>>> spark.sql.shuffle.partitions = 24 >>>>> >>>>> * 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!) >>>>> * note that spark.sql.shuffle.partitions = spark.cores.max >>>>> spark.cores.max = 64 >>>>> spark.executor.cores = 2 >>>>> spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage) >>>>> >>>>> Adjusting the "maxNumInteractions" parameter down to 100 and 50 results >>> in a minor improvement (5-10%). I've also played around with removing >>> [user, item] rows from the input dataset for users with only 1 >>> interaction…I read to try that in another thread…that yielded maybe a >>> 40-50% speed improvement, but I'd rather not toss out data (unless it truly >>> is totally useless, of course :D ). >>>>> >>>>> When I look at the thread dump within the Spark UI's Executors -> >>> thread dump pages, it seems all the executors are very busy in the code >>> pasted below for >95% of the run. GC throughput is very good so we're not >>> bogged down there...it's just super busy doing running the code below. I am >>> intrigued about the comments on the SequentialAccessSparseVector methods I >>> see being called (getQuick and setQuick), which state they take O(log n) >>> time (https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4 >>> 5863df3e53/math/src/main/java/org/apache/mahout/math/ >>> SequentialAccessSparseVector.java). >>>>> >>>>> >>>>> Thanks all for your time and feedback! >>>>> >>>>> Matt Scruggs >>>>> >>>>> org.apache.mahout.math.OrderedIntDoubleMapping.find( >>> OrderedIntDoubleMapping.java:105) >>>>> org.apache.mahout.math.OrderedIntDoubleMapping.get( >>> OrderedIntDoubleMapping.java:110) >>>>> org.apache.mahout.math.SequentialAccessSparseVector.getQuick( >>> SequentialAccessSparseVector.java:157) >>>>> org.apache.mahout.math.SparseRowMatrix.getQuick( >>> SparseRowMatrix.java:90) >>>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240) >>>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$ >>> eq(MatrixOps.scala:45) >>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: >>> 258) >>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: >>> 258) >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ >>> anonfun$3.apply(ExternalAppendOnlyMap.scala:151) >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ >>> anonfun$3.apply(ExternalAppendOnlyMap.scala:150) >>>>> org.apache.spark.util.collection.AppendOnlyMap. >>> changeValue(AppendOnlyMap.scala:144) >>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue( >>> SizeTrackingAppendOnlyMap.scala:32) >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll( >>> ExternalAppendOnlyMap.scala:163) >>>>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) >>>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read( >>> BlockStoreShuffleReader.scala:85) >>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( >>> ShuffleMapTask.scala:79) >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( >>> ShuffleMapTask.scala:47) >>>>> org.apache.spark.scheduler.Task.run(Task.scala:86) >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker( >>> ThreadPoolExecutor.java:1142) >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run( >>> ThreadPoolExecutor.java:617) >>>>> java.lang.Thread.run(Thread.java:745) >>>>> >>>>> ……or this code…… >>>>> >>>>> org.apache.mahout.math.SparseRowMatrix.setQuick( >>> SparseRowMatrix.java:105) >>>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240) >>>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$ >>> eq(MatrixOps.scala:45) >>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: >>> 258) >>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala: >>> 258) >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ >>> anonfun$3.apply(ExternalAppendOnlyMap.scala:151) >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$ >>> anonfun$3.apply(ExternalAppendOnlyMap.scala:150) >>>>> org.apache.spark.util.collection.AppendOnlyMap. >>> changeValue(AppendOnlyMap.scala:144) >>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue( >>> SizeTrackingAppendOnlyMap.scala:32) >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll( >>> ExternalAppendOnlyMap.scala:163) >>>>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) >>>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read( >>> BlockStoreShuffleReader.scala:85) >>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute( >>> MapPartitionsRDD.scala:38) >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( >>> ShuffleMapTask.scala:79) >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask( >>> ShuffleMapTask.scala:47) >>>>> org.apache.spark.scheduler.Task.run(Task.scala:86) >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker( >>> ThreadPoolExecutor.java:1142) >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run( >>> ThreadPoolExecutor.java:617) >>>>> java.lang.Thread.run(Thread.java:745) >>>>> >>>> >>> >>> >