> > On Apr 14, 2015, at 11:44 AM, Michael Kelly <mich...@onespot.com> wrote: > > Hi Pat, > > Had some work to do on the input side for the item-similarity, so just > getting around to running it on a cluster now. > I'm building from source from the 0.10 release against version 2.4.0 hadoop. > The version of spark we are using is 1.1, we use the emr install-spark > script to install spark. > The EMR ami version is 3.6.0
OK sounds good > I haven't got a complete run yet, but will update here when I have. > We're running into some issues with getting the job to run on all > nodes of the cluster and were wondering - > > 1. Does the itemsimilarity job run on all nodes when using yarn? That is a good question. It can but that is decided by the Spark partitioning and that is tied to the input passed in. On my micro-cluster I’ve only seen it run on the master using all cores. Is that what you see? Let me do some research about this. > 2. We are passing in yarn-client as the master, is that correct? yes > This is the command we are running - > ./bin/mahout spark-itemsimilarity --input > hdfs://xx.xx.xx.xx:9000/userUrls --output > hdfs://xx.xx.xx.xx:9000/output --master yarn-client -sem 16g > -D:spark.akka.frameSize=30 > > We ran into a problem with the akka frame size which is why it is increased. OK, interesting. This seems fine assuming the URIs find the data. Also assuming the -D:spark.akka.frameSize=30 fixed your problem there? > Thanks > > On Mon, Apr 6, 2015 at 7:11 PM, Pat Ferrel <p...@occamsmachete.com> wrote: >> Michael, >> >> There is a fix in the latest source on Github. >> >> If you’d like to try it >> >> mvn clean install -DskipTests #there is a failing test at present so skip >> them >> >> Add your version of hadopp if needed, consult here: >> http://mahout.apache.org/developers/buildingmahout.html >> >> From my side, spark-itemsimilarity completes on the data you gave me. If you >> get a chance to try it can you report what happened and your Spark and >> Hadoop version and your number of cluster nodes? >> >> Thanks >> >> On Apr 5, 2015, at 10:30 AM, Pat Ferrel <p...@occamsmachete.com> wrote: >> >> Michael, >> >> The problem is in partitioning the data and if you start with one file the >> partitions are created fine. With a bunch of small files, the optimizer >> trips up by not catching a range of size=0. This will be fixed in 0.10.1 but >> for now (0.10.0) you can: >> >> 1) concat files into one >> 2) I can repartition explicitly with a specified number of partitions (I >> verified this works on your input) then allow you to pass that in since it >> will depend on your cluster and data somewhat—not sure this will always work. >> 3) we are still looking for a better work around so wait a few days... >> >> >> On Apr 3, 2015, at 12:48 PM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: >> >> no, i don't think there's a workaround. it needs a fix; however, in public >> version there are much more fixes needed so I think this part will be >> refactored completely in 0.10.1 >> >> On Fri, Apr 3, 2015 at 12:38 PM, Pat Ferrel <p...@occamsmachete.com> wrote: >> >>> OK, it was. Is there a workaround I can try? >>> >>> >>> On Apr 3, 2015, at 12:22 PM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: >>> >>> Although... i am not aware of one in A'A >>> >>> could be faulty vector length in a matrix if matrix was created by drmWrap >>> with explicit specification of ncol >>> >>> On Fri, Apr 3, 2015 at 12:20 PM, Dmitriy Lyubimov <dlie...@gmail.com> >>> wrote: >>> >>>> it's a bug. There's a number of similar ones in operator A'B. >>>> >>>> On Fri, Apr 3, 2015 at 6:23 AM, Michael Kelly <mich...@onespot.com> >>> wrote: >>>> >>>>> Hi Pat, >>>>> >>>>> I've done some further digging and it looks like the problem is >>>>> occurring when the input files are split up to into parts. The input >>>>> to the item-similarity matrix is the output from a spark job and it >>>>> ends up in about 2000 parts (on the hadoop file system). I have >>>>> reproduced the error locally using a small subset of the rows. >>>>> >>>>> This is a snippet of the file I am using - >>>>> >>>>> ... >>>>> >>>>> 5138353282348067470,1891081885 >>>>> 4417954190713934181,1828065687 >>>>> 133682221673920382,1454844406 >>>>> 133682221673920382,1129053737 >>>>> 133682221673920382,548627241 >>>>> 133682221673920382,1048452021 >>>>> 8547417492653230933,1121310481 >>>>> 7693904559640861382,1333374361 >>>>> 7204049418352603234,606209305 >>>>> 139299176617553863,467181330 >>>>> ... >>>>> >>>>> >>>>> When I run the item-similarity against a single input file which >>>>> contains all the rows, the job succeeds without error. >>>>> >>>>> When I break up the input file into 100 parts, and use the directory >>>>> containing them as input then I get the 'Index outside allowable >>>>> range' exception. >>>>> >>>>> Her are the input files that I used tarred and gzipped - >>>>> >>>>> >>>>> >>> https://s3.amazonaws.com/static.onespot.com/mahout/passing_single_file.tar.gz >>>>> >>>>> >>> https://s3.amazonaws.com/static.onespot.com/mahout/failing_split_into_100_parts.tar.gz >>>>> >>>>> There are 44067 rows in total, 11858 unique userIds and 24166 unique >>>>> itemIds. >>>>> >>>>> This is the exception that I see on the 100 part run - >>>>> 15/04/03 12:07:09 ERROR Executor: Exception in task 0.0 in stage 9.0 >>> (TID >>>>> 707) >>>>> org.apache.mahout.math.IndexException: Index 24190 is outside >>>>> allowable range of [0,24166) >>>>> at >>> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147) >>>>> at >>>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37) >>>>> at >>>>> >>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152) >>>>> at >>>>> >>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149) >>>>> at >>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>>> at >>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) >>>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) >>>>> at >>>>> >>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>>> at >>>>> >>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>>> at >>>>> >>> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) >>>>> at >>> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) >>>>> at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> at >>>>> >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202) >>>>> at >>>>> >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) >>>>> at >>>>> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>> at >>>>> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>>>> at >>>>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> I tried splitting the file up in 10,20 and 50 parts and the job >>> completed. >>>>> Also, should the resulting similarity matrix be the same wether the >>>>> input is split up or not? I passed in the same random seed for the >>>>> spark job, but the matrices were different >>>>> >>>>> Thanks, >>>>> >>>>> Michael >>>>> >>>>> >>>>> >>>>> On Thu, Apr 2, 2015 at 6:56 PM, Pat Ferrel <p...@occamsmachete.com> >>> wrote: >>>>>> The input must be tuples (if not using a filter) so the CLI you have >>>>> expects user and item ids that are >>>>>> >>>>>> user-id1,item-id1 >>>>>> user-id500,item-id3000 >>>>>> … >>>>>> >>>>>> The ids must be tokenized because it doesn’t use a full csv parser, >>>>> only lines of delimited text. >>>>>> >>>>>> If this doesn’t help can you supply a snippet of the input >>>>>> >>>>>> >>>>>> On Apr 2, 2015, at 10:39 AM, Michael Kelly <mich...@onespot.com> >>> wrote: >>>>>> >>>>>> Hi all, >>>>>> >>>>>> I'm running the spark-itemsimilarity job from the cli on an AWS emr >>>>>> cluster, and I'm running into an exception. >>>>>> >>>>>> The input file format is >>>>>> UserId<tab>ItemId1<tab>ItemId2<tab>ItemId3...... >>>>>> >>>>>> There is only one row per user, and a total of 97,000 rows. >>>>>> >>>>>> I also tried input with one row per UserId/ItemId pair, which had >>>>>> about 250,000 rows, but I also saw a similar exception, this time the >>>>>> out of bounds index was around 110,000. >>>>>> >>>>>> The input is stored in hdfs and this is the command I used to start the >>>>> job - >>>>>> >>>>>> mahout spark-itemsimilarity --input userItems --output output --master >>>>>> yarn-client >>>>>> >>>>>> Any idea what the problem might be? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Michael >>>>>> >>>>>> >>>>>> >>>>>> 15/04/02 16:37:40 WARN TaskSetManager: Lost task 1.0 in stage 10.0 >>>>>> (TID 7631, ip-XX.XX.ec2.internal): >>>>>> org.apache.mahout.math.IndexException: Index 22050 is outside >>>>>> allowable range of [0,21997) >>>>>> >>>>>> >>>>> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147) >>>>>> >>>>>> >>>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37) >>>>>> >>>>>> >>>>> >>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152) >>>>>> >>>>>> >>>>> >>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149) >>>>>> >>>>>> >>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>>>> >>>>>> >>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>>>> >>>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) >>>>>> >>>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) >>>>>> >>>>>> >>>>> >>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>>>> >>>>>> >>>>> >>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>>>> >>>>>> >>>>> >>> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) >>>>>> >>>>>> >>>>> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) >>>>>> >>>>>> >>>>> scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) >>>>>> >>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>> >>>>>> >>>>> >>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144) >>>>>> >>>>>> >>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) >>>>>> >>>>>> >>>>> >>> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) >>>>>> >>>>>> >>>>> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>> >>>>>> >>>>> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>> >>>>>> org.apache.spark.scheduler.Task.run(Task.scala:54) >>>>>> >>>>>> >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>>>>> >>>>>> >>>>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>> >>>>>> >>>>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>> >>>>>> java.lang.Thread.run(Thread.java:745) >>>>>> >>>>> >>>> >>>> >>> >>> >> >> >