Repository: mahout Updated Branches: refs/heads/master ebb39fe4a -> 97a920f72
partial fix for index out of range for vectors in AtA, added an example script to run spark-itemsimilarity locally Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/97a920f7 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/97a920f7 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/97a920f7 Branch: refs/heads/master Commit: 97a920f72c6655a615522272b752e6d0cbc1d429 Parents: ebb39fe Author: pferrel <[email protected]> Authored: Sun Apr 5 21:37:27 2015 -0700 Committer: pferrel <[email protected]> Committed: Sun Apr 5 21:37:27 2015 -0700 ---------------------------------------------------------------------- examples/bin/run-item-sim.sh | 45 ++++++++++++++++++++ .../src/main/resources/cf-data-purchase.txt | 7 +++ examples/src/main/resources/cf-data-view.txt | 12 ++++++ .../drivers/TextDelimitedReaderWriter.scala | 16 ++++--- .../apache/mahout/sparkbindings/blas/AtA.scala | 25 +++++++++++ .../drivers/ItemSimilarityDriverSuite.scala | 4 +- 6 files changed, 100 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/97a920f7/examples/bin/run-item-sim.sh ---------------------------------------------------------------------- diff --git a/examples/bin/run-item-sim.sh b/examples/bin/run-item-sim.sh new file mode 100755 index 0000000..b71573b --- /dev/null +++ b/examples/bin/run-item-sim.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# To run: change into the mahout directory and type: +# ./examples/bin/run-item-sim.sh + +OUTPUT="item-sim-out" # output dir for cooccurrence and cross-cooccurrence matrices +INPUT="examples/src/main/resources/cf-data-purchase.txt" # purchase actions +INPUT2="examples/src/main/resources/cf-data-view.txt" # view actions + +#setup env +LOCAL=$MAHOUT-LOCAL # save state +export MAHOUT-LOCAL=true #set so the raw local file system is used + +echo "This runs spark-itemsimilarity on two user actions in two intput files" +echo "The example uses fake purchases and views of products, calculating" +echo "a cooccurrence indicator for purchase and a cross-cooccurrence indicator" +echo "for view (product detail view). The example is tiny so it can be followed" +echo "with a little intuition." + +# Remove old files +echo +echo "Removing old output file if it exists" +echo +rm -r $OUTPUT + +mahout spark-itemsimilarity -i $INPUT -i2 $INPUT2 -o $OUTPUT -ma local + +export MAHOUT-LOCAL=$LOCAL #restore state + +echo "Look in " $OUTPUT " for spark-itemsimilarity indicator data." http://git-wip-us.apache.org/repos/asf/mahout/blob/97a920f7/examples/src/main/resources/cf-data-purchase.txt ---------------------------------------------------------------------- diff --git a/examples/src/main/resources/cf-data-purchase.txt b/examples/src/main/resources/cf-data-purchase.txt new file mode 100644 index 0000000..d87c031 --- /dev/null +++ b/examples/src/main/resources/cf-data-purchase.txt @@ -0,0 +1,7 @@ +u1,iphone +u1,ipad +u2,nexus +u2,galaxy +u3,surface +u4,iphone +u4,galaxy http://git-wip-us.apache.org/repos/asf/mahout/blob/97a920f7/examples/src/main/resources/cf-data-view.txt ---------------------------------------------------------------------- diff --git a/examples/src/main/resources/cf-data-view.txt b/examples/src/main/resources/cf-data-view.txt new file mode 100644 index 0000000..09ad9b6 --- /dev/null +++ b/examples/src/main/resources/cf-data-view.txt @@ -0,0 +1,12 @@ +u1,ipad +u1,nexus +u1,galaxy +u2,iphone +u2,ipad +u2,nexus +u2,galaxy +u3,surface +u3,nexus +u4,iphone +u4,ipad +u4,galaxy http://git-wip-us.apache.org/repos/asf/mahout/blob/97a920f7/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index a90e672..6c7992a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -22,7 +22,7 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.spark.SparkContext._ import org.apache.mahout.math.RandomAccessSparseVector import com.google.common.collect.{BiMap, HashBiMap} -import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} +import org.apache.mahout.math.drm.{DrmLike, DrmLikeOps, DistributedContext, CheckpointedDrm} import org.apache.mahout.sparkbindings._ import scala.collection.JavaConversions._ @@ -108,7 +108,8 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ .asInstanceOf[DrmRdd[Int]] // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed - val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol) + //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol) + val drmInteractions = drmWrap[Int](indexedInteractions) new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary) @@ -186,17 +187,18 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ val id = element.split(columnIdStrengthDelim)(0) val columnID = columnIDDictionary_bcast.value.get(id).get val pair = element.split(columnIdStrengthDelim) - if (pair.size == 2)// there was a strength - row.setQuick(columnID,pair(1).toDouble) + if (pair.size == 2) // there was a strength + row.setQuick(columnID, pair(1).toDouble) else // no strength so set DRM value to 1.0d, this ignores 'omitScore', which is a write param - row.setQuick(columnID,1.0d) + row.setQuick(columnID, 1.0d) } rowIndex -> row } .asInstanceOf[DrmRdd[Int]] - // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed - val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol) + // wrap the DrmRdd in a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed + //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol) + val drmInteractions = drmWrap[Int](indexedInteractions) new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary) http://git-wip-us.apache.org/repos/asf/mahout/blob/97a920f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala index 450e836..be4f08c 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala @@ -135,8 +135,25 @@ object AtA { val m = op.A.nrow val n = op.A.ncol +/* possible fix for index out of range for vector range val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1 val blockHeight = (n - 1) / numParts + 1 +*/ + val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1 min n + + // Computing evenly split ranges to denote each partition size. + + // Base size. + val baseSize = n / numParts + + // How many partitions needs to be baseSize +1. + val slack = n - baseSize * numParts + + val ranges = + // Start with partition offsets... total numParts + 1. + (0 to numParts).view.map { i => (baseSize + 1) * i - (i - slack max 0)} + // And convert offsets to ranges. + .sliding(2).map(s => s(0) until s(1)).toIndexedSeq val rddAtA = srcRdd @@ -147,9 +164,13 @@ object AtA { .flatMap { v => for (blockKey <- Stream.range(0, numParts)) yield { +/* patch to fix index out of range for vector access val blockStart = blockKey * blockHeight val blockEnd = n min (blockStart + blockHeight) blockKey -> (v(blockStart until blockEnd) cross v) +*/ + val range = ranges(blockKey) + blockKey -> (v(range) cross v) } } // Combine outer blocks @@ -158,8 +179,12 @@ object AtA { // Restore proper block keys .map { case (blockKey, block) => +/* patch to fix index out of range for vector access val blockStart = blockKey * blockHeight val rowKeys = Array.tabulate(block.nrow)(blockStart + _) +*/ + val range = ranges(blockKey) + val rowKeys = Array.tabulate(block.nrow)(range.start + _) rowKeys -> block } http://git-wip-us.apache.org/repos/asf/mahout/blob/97a920f7/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala index ea6b40f..4800a32 100644 --- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala @@ -368,10 +368,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "u2\tpurchase\tgalaxy", "u3\tpurchase\tsurface", "u4\tpurchase\tiphone", - "u4\tpurchase\tgalaxy", - "u1\tview\tiphone") + "u4\tpurchase\tgalaxy") val M2Lines = Array( + "u1\tview\tiphone", "u1\tview\tipad", "u1\tview\tnexus", "u1\tview\tgalaxy",
