> 
> On Apr 14, 2015, at 11:44 AM, Michael Kelly <mich...@onespot.com> wrote:
> 
> Hi Pat,
> 
> Had some work to do on the input side for the item-similarity, so just
> getting around to running it on a cluster now.
> I'm building from source from the 0.10 release against version 2.4.0 hadoop.
> The version of spark we are using is 1.1, we use the emr install-spark
> script to install spark.
> The EMR ami version is 3.6.0

OK sounds good

> I haven't got a complete run yet, but will update here when I have.
> We're running into some issues with getting the job to run on all
> nodes of the cluster and were wondering -
> 
> 1. Does the itemsimilarity job run on all nodes when using yarn?

That is a good question. It can but that is decided by the Spark partitioning 
and that is tied to the input passed in.  On my micro-cluster I’ve only seen it 
run on the master using all cores. Is that what you see?

Let me do some research about this.

> 2. We are passing in yarn-client as the master, is that correct?

yes

> This is the command we are running -
> ./bin/mahout spark-itemsimilarity --input
> hdfs://xx.xx.xx.xx:9000/userUrls --output
> hdfs://xx.xx.xx.xx:9000/output --master yarn-client -sem 16g
> -D:spark.akka.frameSize=30
> 
> We ran into a problem with the akka frame size which is why it is increased.

OK, interesting. This seems fine assuming the URIs find the data. Also assuming 
the -D:spark.akka.frameSize=30 fixed your problem there?


> Thanks
> 
> On Mon, Apr 6, 2015 at 7:11 PM, Pat Ferrel <p...@occamsmachete.com> wrote:
>> Michael,
>> 
>> There is a fix in the latest source on Github.
>> 
>> If you’d like to try it
>> 
>>    mvn clean install -DskipTests #there is a failing test at present so skip 
>> them
>> 
>> Add your version of hadopp if needed, consult here: 
>> http://mahout.apache.org/developers/buildingmahout.html
>> 
>> From my side, spark-itemsimilarity completes on the data you gave me. If you 
>> get a chance to try it can you report what happened and your Spark and 
>> Hadoop version and your number of cluster nodes?
>> 
>> Thanks
>> 
>> On Apr 5, 2015, at 10:30 AM, Pat Ferrel <p...@occamsmachete.com> wrote:
>> 
>> Michael,
>> 
>> The problem is in partitioning the data and if you start with one file the 
>> partitions are created fine. With a bunch of small files, the optimizer 
>> trips up by not catching a range of size=0. This will be fixed in 0.10.1 but 
>> for now (0.10.0) you can:
>> 
>> 1) concat files into one
>> 2) I can repartition explicitly with a specified number of partitions (I 
>> verified this works on your input) then allow you to pass that in since it 
>> will depend on your cluster and data somewhat—not sure this will always work.
>> 3) we are still looking for a better work around so wait a few days...
>> 
>> 
>> On Apr 3, 2015, at 12:48 PM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:
>> 
>> no, i don't think there's a workaround. it needs a fix; however, in public
>> version there are much more fixes needed so I think this part will be
>> refactored completely in 0.10.1
>> 
>> On Fri, Apr 3, 2015 at 12:38 PM, Pat Ferrel <p...@occamsmachete.com> wrote:
>> 
>>> OK, it was. Is there a workaround I can try?
>>> 
>>> 
>>> On Apr 3, 2015, at 12:22 PM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:
>>> 
>>> Although... i am not aware of one in A'A
>>> 
>>> could be faulty vector length in a matrix if matrix was created by drmWrap
>>> with explicit specification of ncol
>>> 
>>> On Fri, Apr 3, 2015 at 12:20 PM, Dmitriy Lyubimov <dlie...@gmail.com>
>>> wrote:
>>> 
>>>> it's a  bug. There's a number of similar ones in operator A'B.
>>>> 
>>>> On Fri, Apr 3, 2015 at 6:23 AM, Michael Kelly <mich...@onespot.com>
>>> wrote:
>>>> 
>>>>> Hi Pat,
>>>>> 
>>>>> I've done some further digging and it looks like the problem is
>>>>> occurring when the input files are split up to into parts. The input
>>>>> to the item-similarity matrix is the output from a spark job and it
>>>>> ends up in about 2000 parts (on the hadoop file system). I have
>>>>> reproduced the error locally using a small subset of the rows.
>>>>> 
>>>>> This is a snippet of the file I am using -
>>>>> 
>>>>> ...
>>>>> 
>>>>> 5138353282348067470,1891081885
>>>>> 4417954190713934181,1828065687
>>>>> 133682221673920382,1454844406
>>>>> 133682221673920382,1129053737
>>>>> 133682221673920382,548627241
>>>>> 133682221673920382,1048452021
>>>>> 8547417492653230933,1121310481
>>>>> 7693904559640861382,1333374361
>>>>> 7204049418352603234,606209305
>>>>> 139299176617553863,467181330
>>>>> ...
>>>>> 
>>>>> 
>>>>> When I run the item-similarity against a single input file which
>>>>> contains all the rows, the job succeeds without error.
>>>>> 
>>>>> When I break up the input file into 100 parts, and use the directory
>>>>> containing them as input then I get the 'Index outside allowable
>>>>> range' exception.
>>>>> 
>>>>> Her are the input files that I used tarred and gzipped -
>>>>> 
>>>>> 
>>>>> 
>>> https://s3.amazonaws.com/static.onespot.com/mahout/passing_single_file.tar.gz
>>>>> 
>>>>> 
>>> https://s3.amazonaws.com/static.onespot.com/mahout/failing_split_into_100_parts.tar.gz
>>>>> 
>>>>> There are 44067 rows in total, 11858 unique userIds and 24166 unique
>>>>> itemIds.
>>>>> 
>>>>> This is the exception that I see on the 100 part run -
>>>>> 15/04/03 12:07:09 ERROR Executor: Exception in task 0.0 in stage 9.0
>>> (TID
>>>>> 707)
>>>>> org.apache.mahout.math.IndexException: Index 24190 is outside
>>>>> allowable range of [0,24166)
>>>>> at
>>> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147)
>>>>> at
>>>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37)
>>>>> at
>>>>> 
>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152)
>>>>> at
>>>>> 
>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149)
>>>>> at
>>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>> at
>>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>>>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>>>>> at
>>>>> 
>>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>> at
>>>>> 
>>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>> at
>>>>> 
>>> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
>>>>> at
>>> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
>>>>> at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>> at
>>>>> 
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
>>>>> at
>>>>> 
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
>>>>> at
>>>>> 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>> at
>>>>> 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>>> at
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> at
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> 
>>>>> 
>>>>> I tried splitting the file up in 10,20 and 50 parts and the job
>>> completed.
>>>>> Also, should the resulting similarity matrix be the same wether the
>>>>> input is split up or not? I passed in the same random seed for the
>>>>> spark job, but the matrices were different
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Michael
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Apr 2, 2015 at 6:56 PM, Pat Ferrel <p...@occamsmachete.com>
>>> wrote:
>>>>>> The input must be tuples (if not using a filter) so the CLI you have
>>>>> expects user and item ids that are
>>>>>> 
>>>>>> user-id1,item-id1
>>>>>> user-id500,item-id3000
>>>>>> …
>>>>>> 
>>>>>> The ids must be tokenized because it doesn’t use a full csv parser,
>>>>> only lines of delimited text.
>>>>>> 
>>>>>> If this doesn’t help can you supply a snippet of the input
>>>>>> 
>>>>>> 
>>>>>> On Apr 2, 2015, at 10:39 AM, Michael Kelly <mich...@onespot.com>
>>> wrote:
>>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> I'm running the spark-itemsimilarity job from the cli on an AWS emr
>>>>>> cluster, and I'm running into an exception.
>>>>>> 
>>>>>> The input file format is
>>>>>> UserId<tab>ItemId1<tab>ItemId2<tab>ItemId3......
>>>>>> 
>>>>>> There is only one row per user, and a total of 97,000 rows.
>>>>>> 
>>>>>> I also tried input with one row per UserId/ItemId pair, which had
>>>>>> about 250,000 rows, but I also saw a similar exception, this time the
>>>>>> out of bounds index was around 110,000.
>>>>>> 
>>>>>> The input is stored in hdfs and this is the command I used to start the
>>>>> job -
>>>>>> 
>>>>>> mahout spark-itemsimilarity --input userItems --output output --master
>>>>>> yarn-client
>>>>>> 
>>>>>> Any idea what the problem might be?
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Michael
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 15/04/02 16:37:40 WARN TaskSetManager: Lost task 1.0 in stage 10.0
>>>>>> (TID 7631, ip-XX.XX.ec2.internal):
>>>>>> org.apache.mahout.math.IndexException: Index 22050 is outside
>>>>>> allowable range of [0,21997)
>>>>>> 
>>>>>> 
>>>>> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147)
>>>>>> 
>>>>>> 
>>>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37)
>>>>>> 
>>>>>> 
>>>>> 
>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152)
>>>>>> 
>>>>>> 
>>>>> 
>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149)
>>>>>> 
>>>>>> 
>>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>>> 
>>>>>> 
>>>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>>> 
>>>>>>    scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>>>>>> 
>>>>>>    scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>>>>>> 
>>>>>> 
>>>>> 
>>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>>> 
>>>>>> 
>>>>> 
>>> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>>> 
>>>>>> 
>>>>> 
>>> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
>>>>>> 
>>>>>> 
>>>>> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
>>>>>> 
>>>>>> 
>>>>> scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
>>>>>> 
>>>>>>    scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>> 
>>>>>> 
>>>>> 
>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144)
>>>>>> 
>>>>>> 
>>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>>>>>> 
>>>>>> 
>>>>> 
>>> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
>>>>>> 
>>>>>> 
>>>>> 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>> 
>>>>>> 
>>>>> 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>> 
>>>>>>    org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>> 
>>>>>> 
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>> 
>>>>>> 
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> 
>>>>>> 
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>> 
>>>>>>    java.lang.Thread.run(Thread.java:745)
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to