Repository: mahout Updated Branches: refs/heads/master a80974037 -> ee6359f62
MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel) this closes apache/mahout#40 Squashed commit of the following: commit 2e362dad82aef764bef163a64eb2bfc1b836a07e Author: pferrel <[email protected]> Date: Thu Aug 7 15:29:51 2014 -0700 no need to delete tmp afterAll since doing it afterEach commit 29f73e514570073486b6d8084f4d8a958765fc95 Author: pferrel <[email protected]> Date: Thu Aug 7 15:24:12 2014 -0700 had to fall back to tokenized data comparison in tests since the order of values cannot be relied upon commit ec42fe7a1ba6b26aa39bb99e977af7ebb15d9523 Author: pferrel <[email protected]> Date: Thu Aug 7 13:14:26 2014 -0700 added a check in driver to see if the context should be closed, not when running a test commit 2e24ddc7dd612a91fd36312973b05d715def800d Merge: a809740 1d07cf0 Author: pferrel <[email protected]> Date: Thu Aug 7 12:19:16 2014 -0700 changing tests for drivers to reuse the test context instead of creating new ones commit 1d07cf0d1a4409bab9d29e824543c0dae7c0d903 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 15:07:58 2014 -0700 license commit f1a31a77f5a8b4d4961a871a88e1a4f5194df90b Merge: d64146f 00c0149 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 13:29:38 2014 -0700 Merge branch 'master' into spark-1.0.x commit d64146ff12be5ffad1832075539397be94c999a3 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 12:58:07 2014 -0700 "deleteOnExit" doesn't work in tests, rolling back. needs code added to test base for scala test that handles temporary directories, similarly to how it's been done for junit. commit 69c393a373a6a80b1542a745cca685b2709696b6 Merge: 439e878 7a50a29 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 12:39:28 2014 -0700 Merge branch 'master' into spark-1.0.x commit 439e87850d552938d2d0e2c68507c200cabd8d1c Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 12:31:31 2014 -0700 Temporarily disable ItemSimilarityDriverSuite as failing under Spark 1.0.x for yet unknown reason commit af099a2d53f673ce37a1d483a3424b78e6b9cb9c Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 12:30:51 2014 -0700 MAHOUT-1597: A + 1.0 (fixes) commit 194b77438f532cc7291f382710aa13d97c07a249 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 11:31:56 2014 -0700 Single Blas suite to speed up, share session commit 26a5824ca9bf64430f50f3c524ca14a1c68a04b5 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Aug 6 10:58:48 2014 -0700 Shared context, local[3], bumping up Xmx to 768m to run tests commit e31e9b97724600f806af7e7f861ebdc7943e54bc Author: Dmitriy Lyubimov <[email protected]> Date: Mon Aug 4 17:28:35 2014 -0700 bumping scala to 10.4 commit 13e909b58eaa89e212415318655dbe82ef982323 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Aug 4 15:00:59 2014 -0700 Initial migration. Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ee6359f6 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ee6359f6 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ee6359f6 Branch: refs/heads/master Commit: ee6359f621b508ab7f21df0316941e68c75eb3e5 Parents: a809740 Author: Dmitriy Lyubimov <[email protected]> Authored: Fri Aug 8 11:52:10 2014 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Fri Aug 8 11:52:10 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../mahout/test/LoggerConfiguration.scala | 8 +- pom.xml | 6 +- .../mahout/drivers/ItemSimilarityDriver.scala | 5 +- .../apache/mahout/drivers/MahoutDriver.scala | 25 +- .../mahout/drivers/MahoutOptionParser.scala | 5 - .../mahout/sparkbindings/SparkEngine.scala | 2 +- .../drivers/ItemSimilarityDriverSuite.scala | 745 +++++++++---------- .../mahout/sparkbindings/blas/ABtSuite.scala | 54 -- .../mahout/sparkbindings/blas/AewBSuite.scala | 101 --- .../mahout/sparkbindings/blas/AtASuite.scala | 48 -- .../mahout/sparkbindings/blas/AtSuite.scala | 44 -- .../mahout/sparkbindings/blas/BlasSuite.scala | 154 ++++ .../test/DistributedSparkSuite.scala | 25 +- .../test/LoggerConfiguration.scala | 8 +- 15 files changed, 557 insertions(+), 675 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 538b12b..aefb838 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 1.0 - unreleased + MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel) + MAHOUT-1596: implement rbind() operator (Anand Avati and dlyubimov) MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows, Spark side (dlyubimov) http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala b/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala index 95b92b7..7a34aa2 100644 --- a/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala +++ b/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala @@ -1,13 +1,13 @@ package org.apache.mahout.test -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, BeforeAndAfter, Suite} +import org.scalatest._ import org.apache.log4j.{Level, Logger, BasicConfigurator} -trait LoggerConfiguration extends BeforeAndAfterAll { +trait LoggerConfiguration extends BeforeAndAfterAllConfigMap { this: Suite => - override protected def beforeAll(): Unit = { - super.beforeAll() + override protected def beforeAll(configMap: ConfigMap): Unit = { + super.beforeAll(configMap) BasicConfigurator.resetConfiguration() BasicConfigurator.configure() Logger.getRootLogger.setLevel(Level.ERROR) http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index aad4c9c..ef9ae03 100644 --- a/pom.xml +++ b/pom.xml @@ -108,8 +108,8 @@ <lucene.version>4.6.1</lucene.version> <slf4j.version>1.7.5</slf4j.version> <scala.major>2.10</scala.major> - <scala.version>2.10.3</scala.version> - <spark.version>0.9.1</spark.version> + <scala.version>2.10.4</scala.version> + <spark.version>1.0.1</spark.version> </properties> <issueManagement> <system>Jira</system> @@ -557,7 +557,7 @@ <configuration> <forkCount>2</forkCount> <reuseForks>false</reuseForks> - <argLine>-Xmx512m -Djava.security.manager -Djava.library.path=${env.HADOOP_HOME}\bin + <argLine>-Xmx768m -Djava.security.manager -Djava.library.path=${env.HADOOP_HOME}\bin -Djava.security.policy=${project.build.directory}/../../buildtools/src/test/resources/java.policy</argLine> <argLine>-Djava.security.auth.login.config=${project.build.directory}/../../buildtools/src/test/resources/jaas.config</argLine> <testFailureIgnore>false</testFailureIgnore> http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index e0eaabc..460106f 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -123,8 +123,7 @@ object ItemSimilarityDriver extends MahoutDriver { } override def start(masterUrl: String = options("master").asInstanceOf[String], - appName: String = options("appName").asInstanceOf[String], - dontAddMahoutJars: Boolean = options("dontAddMahoutJars").asInstanceOf[Boolean]): + appName: String = options("appName").asInstanceOf[String]): Unit = { // todo: the HashBiMap used in the TextDelimited Reader is hard coded into @@ -134,7 +133,7 @@ object ItemSimilarityDriver extends MahoutDriver { .set("spark.kryoserializer.buffer.mb", "200") .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String]) - super.start(masterUrl, appName, dontAddMahoutJars) + super.start(masterUrl, appName) val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String], "filter" -> options("filter1").asInstanceOf[String], http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala index 796a66a..e92ed37 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -58,32 +58,35 @@ abstract class MahoutDriver { implicit var mc: DistributedContext = _ - implicit val sparkConf = new SparkConf() + implicit var sparkConf = new SparkConf() + var _useExistingContext: Boolean = false /** Creates a Spark context to run the job inside. * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job, * these must be set before the context is created. * @param masterUrl Spark master URL * @param appName Name to display in Spark UI - * @param customJars List of paths to custom jars * */ - protected def start(masterUrl: String, appName: String, customJars:Traversable[String]) : Unit = { - mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf) - } - - protected def start(masterUrl: String, appName: String, dontAddMahoutJars: Boolean = false) : Unit = { - val customJars = Traversable.empty[String] - mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf, !dontAddMahoutJars) + protected def start(masterUrl: String, appName: String) : Unit = { + if (!_useExistingContext) { + mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf) + } } /** Override (optionally) for special cleanup */ protected def stop: Unit = { - mc.close + if (!_useExistingContext) mc.close } - /** This is wher you do the work, call start first, then before exiting call stop */ + /** This is where you do the work, call start first, then before exiting call stop */ protected def process: Unit /** Parse command line and call process */ def main(args: Array[String]): Unit + + def useContext(context: DistributedContext): Unit = { + _useExistingContext = true + mc = context + sparkConf = mc.getConf + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala index ba4ca1d..3aada78 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -24,7 +24,6 @@ object MahoutOptionParser { // set up the various default option groups final val GenericOptions = immutable.HashMap[String, Any]( "randomSeed" -> System.currentTimeMillis().toInt, - "dontAddMahoutJars" -> false, "writeAllDatasets" -> false) final val SparkOptions = immutable.HashMap[String, Any]( @@ -102,10 +101,6 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A if (x > 0) success else failure("Option --randomSeed must be > 0") } - opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) => - options + ("dontAddMahoutJars" -> true) - }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly - //output both input DRMs opt[Unit]("writeAllDatasets") hidden() action { (_, options) => options + ("writeAllDatasets" -> true) http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 36223fc..dedb279 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -127,7 +127,7 @@ object SparkEngine extends DistributedEngine { */ def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { - val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minSplits = parMin) + val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin) // Get rid of VectorWritable .map(t => (t._1, t._2.get())) http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala index f1981bb..cfabfdb 100644 --- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala @@ -19,42 +19,30 @@ package org.apache.mahout.drivers import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark -import org.scalatest.FunSuite +import org.scalatest.{ConfigMap, FunSuite} import org.apache.mahout.sparkbindings._ import org.apache.mahout.sparkbindings.test.DistributedSparkSuite -import org.apache.mahout.test.MahoutSuite - - +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.scalabindings._ //todo: take out, only for temp tests -import org.apache.mahout.math._ import org.apache.mahout.math.scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm._ import RLikeDrmOps._ import scala.collection.JavaConversions._ -import org.apache.mahout.math.stats.LogLikelihood -import collection._ -import org.apache.mahout.common.RandomUtils -import org.apache.mahout.math.function.{VectorFunction, Functions} - -class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with DistributedSparkSuite { +class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { /* - // correct self-cooccurrence with LLR final val matrixLLRCoocAtAControl = dense( - (0.0, 0.6331745808516107, 0.0, 0.0, 0.0), - (0.6331745808516107, 0.0, 0.0, 0.0, 0.0), - (0.0, 0.0, 0.0, 0.6331745808516107, 0.0), - (0.0, 0.0, 0.6331745808516107, 0.0, 0.0), - (0.0, 0.0, 0.0, 0.0, 0.0)) + (0.0, 1.7260924347106847, 0.0, 0.0, 0.0), + (1.7260924347106847, 0.0, 0.0, 0.0, 0.0), + (0.0, 0.0, 0.0, 1.7260924347106847, 0.0), + (0.0, 0.0, 1.7260924347106847, 0.0, 0.0), + (0.0, 0.0, 0.0, 0.0, 0.0)) - // correct cross-cooccurrence with LLR final val matrixLLRCoocBtAControl = dense( (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0), (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0), @@ -78,20 +66,35 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847", "surface\tsurface:4.498681156950466 nexus:0.6795961471815897") + // todo: a better test would be to sort each vector by itemID and compare rows, tokens misses some error cases + final val SelfSimilairtyTokens = tokenize(Iterable( + "galaxy\tnexus:1.7260924347106847", + "ipad\tiphone:1.7260924347106847", + "nexus\tgalaxy:1.7260924347106847", + "iphone\tipad:1.7260924347106847", + "surface")) + + val CrossIndicatorTokens = tokenize(Iterable( + "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847", + "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", + "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", + "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847", + "surface\tsurface:4.498681156950466 nexus:0.6795961471815897")) + final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to /* //Clustered Spark and HDFS, not a good everyday build test ItemSimilarityDriver.main(Array( - "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt", - "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/", - "--master", "spark://occam4:7077", - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", ",", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1" + "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt", + "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/", + "--master", "spark://occam4:7077", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" )) */ // local multi-threaded Spark with HDFS using large dataset @@ -109,7 +112,8 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut )) */ - test ("ItemSimilarityDriver, non-full-spec CSV"){ + // TODO: failing, temporarily disabled + test("ItemSimilarityDriver, non-full-spec CSV") { val InFile = TmpDir + "in-file.csv/" //using part files, not single file val OutPath = TmpDir + "indicator-matrices/" @@ -140,147 +144,133 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut // take account of one actual file val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InFile, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", ",", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1", - "--writeAllDatasets", - "--dontAddMahoutJars")) - - - beforeEach // restart the test context to read the output of the driver + "--input", InFile, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1", + "--writeAllDatasets")) // todo: these comparisons rely on a sort producing the same lines, which could possibly // fail since the sort is on value and these can be the same for all items in a vector val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs SelfSimilairtyLines + tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines + tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens } - test ("ItemSimilarityDriver TSV "){ + test("ItemSimilarityDriver TSV ") { val InFile = TmpDir + "in-file.tsv/" val OutPath = TmpDir + "indicator-matrices/" val lines = Array( - "u1\tpurchase\tiphone", - "u1\tpurchase\tipad", - "u2\tpurchase\tnexus", - "u2\tpurchase\tgalaxy", - "u3\tpurchase\tsurface", - "u4\tpurchase\tiphone", - "u4\tpurchase\tgalaxy", - "u1\tview\tiphone", - "u1\tview\tipad", - "u1\tview\tnexus", - "u1\tview\tgalaxy", - "u2\tview\tiphone", - "u2\tview\tipad", - "u2\tview\tnexus", - "u2\tview\tgalaxy", - "u3\tview\tsurface", - "u3\tview\tnexus", - "u4\tview\tiphone", - "u4\tview\tipad", - "u4\tview\tgalaxy") + "u1\tpurchase\tiphone", + "u1\tpurchase\tipad", + "u2\tpurchase\tnexus", + "u2\tpurchase\tgalaxy", + "u3\tpurchase\tsurface", + "u4\tpurchase\tiphone", + "u4\tpurchase\tgalaxy", + "u1\tview\tiphone", + "u1\tview\tipad", + "u1\tview\tnexus", + "u1\tview\tgalaxy", + "u2\tview\tiphone", + "u2\tview\tipad", + "u2\tview\tnexus", + "u2\tview\tgalaxy", + "u3\tview\tsurface", + "u3\tview\tnexus", + "u4\tview\tiphone", + "u4\tview\tipad", + "u4\tview\tgalaxy") // this will create multiple part-xxxxx files in the InFile dir but other tests will // take account of one actual file val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InFile, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", "[,\t]", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1", - "--dontAddMahoutJars")) + "--input", InFile, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", "[,\t]", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1")) - beforeEach // restart the test context to read the output of the driver // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss // some error cases val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs SelfSimilairtyLines + tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines + tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens } - test ("ItemSimilarityDriver log-ish files"){ + test("ItemSimilarityDriver log-ish files") { val InFile = TmpDir + "in-file.log/" val OutPath = TmpDir + "indicator-matrices/" val lines = Array( - "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone", - "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad", - "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus", - "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy", - "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface", - "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone", - "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy", - "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone", - "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad", - "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus", - "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy", - "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone", - "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad", - "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus", - "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy", - "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface", - "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus", - "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone", - "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad", - "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy") + "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone", + "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad", + "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus", + "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy", + "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface", + "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone", + "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy", + "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone", + "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad", + "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus", + "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy", + "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone", + "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad", + "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus", + "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy", + "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface", + "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus", + "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone", + "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad", + "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy") // this will create multiple part-xxxxx files in the InFile dir but other tests will // take account of one actual file val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InFile, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", "\t", - "--itemIDPosition", "4", - "--rowIDPosition", "1", - "--filterPosition", "2", - "--dontAddMahoutJars")) + "--input", InFile, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", "\t", + "--itemIDPosition", "4", + "--rowIDPosition", "1", + "--filterPosition", "2")) - beforeEach // restart the test context to read the output of the driver val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs SelfSimilairtyLines + tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines + tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens } - test ("ItemSimilarityDriver legacy supported file format"){ + test("ItemSimilarityDriver legacy supported file format") { val InDir = TmpDir + "in-dir/" val InFilename = "in-file.tsv" @@ -289,20 +279,20 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut val OutPath = TmpDir + "indicator-matrices" val lines = Array( - "0,0,1", - "0,1,1", - "1,2,1", - "1,3,1", - "2,4,1", - "3,0,1", - "3,3,1") - - val Answer = Iterable( - "0\t1:1.7260924347106847", - "3\t2:1.7260924347106847", - "1\t0:1.7260924347106847", - "4", - "2\t3:1.7260924347106847") + "0,0,1", + "0,1,1", + "1,2,1", + "1,3,1", + "2,4,1", + "3,0,1", + "3,3,1") + + val Answer = tokenize(Iterable( + "0\t1:1.7260924347106847", + "3\t2:1.7260924347106847", + "1\t0:1.7260924347106847", + "4", + "2\t3:1.7260924347106847")) // this creates one part-0000 file in the directory mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir) @@ -312,24 +302,18 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut //rename part-00000 to something.tsv fs.rename(new Path(InDir + "part-00000"), new Path(InPath)) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InPath, - "--output", OutPath, - "--master", masterUrl, - "--dontAddMahoutJars")) + "--input", InPath, + "--output", OutPath, + "--master", masterUrl)) - beforeEach // restart the test context to read the output of the driver - // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss - // some error cases val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs Answer + tokenize(indicatorLines) should contain theSameElementsAs Answer } - test ("ItemSimilarityDriver write search engine output"){ + test("ItemSimilarityDriver write search engine output") { val InDir = TmpDir + "in-dir/" val InFilename = "in-file.tsv" @@ -338,20 +322,20 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut val OutPath = TmpDir + "indicator-matrices" val lines = Array( - "0,0,1", - "0,1,1", - "1,2,1", - "1,3,1", - "2,4,1", - "3,0,1", - "3,3,1") - - val Answer = Iterable( - "0\t1", - "3\t2", - "1\t0", - "4", - "2\t3") + "0,0,1", + "0,1,1", + "1,2,1", + "1,3,1", + "2,4,1", + "3,0,1", + "3,3,1") + + val Answer = tokenize(Iterable( + "0\t1", + "3\t2", + "1\t0", + "4", + "2\t3")) // this creates one part-0000 file in the directory mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir) @@ -361,51 +345,45 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut //rename part-00000 to something.tsv fs.rename(new Path(InDir + "part-00000"), new Path(InPath)) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InPath, - "--output", OutPath, - "--master", masterUrl, - "--dontAddMahoutJars", - "--omitStrength")) + "--input", InPath, + "--output", OutPath, + "--master", masterUrl, + "--omitStrength")) - beforeEach // restart the test context to read the output of the driver - // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss - // some error cases val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs Answer + tokenize(indicatorLines) should contain theSameElementsAs Answer } - test("ItemSimilarityDriver recursive file discovery using filename patterns"){ + test("ItemSimilarityDriver recursive file discovery using filename patterns") { //directory structure using the following // tmp/data/m1.tsv // tmp/data/more-data/another-dir/m2.tsv val M1Lines = Array( - "u1\tpurchase\tiphone", - "u1\tpurchase\tipad", - "u2\tpurchase\tnexus", - "u2\tpurchase\tgalaxy", - "u3\tpurchase\tsurface", - "u4\tpurchase\tiphone", - "u4\tpurchase\tgalaxy", - "u1\tview\tiphone") + "u1\tpurchase\tiphone", + "u1\tpurchase\tipad", + "u2\tpurchase\tnexus", + "u2\tpurchase\tgalaxy", + "u3\tpurchase\tsurface", + "u4\tpurchase\tiphone", + "u4\tpurchase\tgalaxy", + "u1\tview\tiphone") val M2Lines = Array( - "u1\tview\tipad", - "u1\tview\tnexus", - "u1\tview\tgalaxy", - "u2\tview\tiphone", - "u2\tview\tipad", - "u2\tview\tnexus", - "u2\tview\tgalaxy", - "u3\tview\tsurface", - "u3\tview\tnexus", - "u4\tview\tiphone", - "u4\tview\tipad", - "u4\tview\tgalaxy") + "u1\tview\tipad", + "u1\tview\tnexus", + "u1\tview\tgalaxy", + "u2\tview\tiphone", + "u2\tview\tipad", + "u2\tview\tnexus", + "u2\tview\tgalaxy", + "u3\tview\tsurface", + "u3\tview\tnexus", + "u4\tview\tiphone", + "u4\tview\tipad", + "u4\tview\tgalaxy") val InFilenameM1 = "m1.tsv" val InDirM1 = TmpDir + "data/" @@ -434,27 +412,23 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut // local multi-threaded Spark with default FS, suitable for build tests but need better location for data - afterEach // clean up before running the driver, it should handle the Spark conf and context - ItemSimilarityDriver.main(Array( - "--input", InPathStart, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", "\t", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1", - "--filenamePattern", "m..tsv", - "--recursive", - "--dontAddMahoutJars")) + "--input", InPathStart, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", "\t", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1", + "--filenamePattern", "m..tsv", + "--recursive")) - beforeEach()// restart the test context to read the output of the driver val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs SelfSimilairtyLines + tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines + tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens } @@ -465,53 +439,49 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut val OutPath = TmpDir + "indicator-matrices/" val lines = Array( - "u1,purchase,iphone", - "u1,purchase,ipad", - "u2,purchase,nexus", - "u2,purchase,galaxy", - "u3,purchase,surface", - "u4,purchase,iphone", - "u4,purchase,galaxy", - "u1,view,iphone", - "u1,view,ipad", - "u1,view,nexus", - "u1,view,galaxy", - "u2,view,iphone", - "u2,view,ipad", - "u2,view,nexus", - "u2,view,galaxy", - "u3,view,surface", - "u3,view,nexus", - "u4,view,iphone", - "u4,view,ipad", - "u4,view,galaxy") + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,iphone", + "u1,view,ipad", + "u1,view,nexus", + "u1,view,galaxy", + "u2,view,iphone", + "u2,view,ipad", + "u2,view,nexus", + "u2,view,galaxy", + "u3,view,surface", + "u3,view,nexus", + "u4,view,iphone", + "u4,view,ipad", + "u4,view,galaxy") // this will create multiple part-xxxxx files in the InFile dir but other tests will // take account of one actual file val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1) val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InFile1, - "--input2", InFile2, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", ",", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1", - "--dontAddMahoutJars")) + "--input", InFile1, + "--input2", InFile2, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1")) - beforeEach // restart the test context to read the output of the driver val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs SelfSimilairtyLines + tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines + tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens } @@ -522,68 +492,62 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut val OutPath = TmpDir + "indicator-matrices/" val lines = Array( - "u1,purchase,iphone", - "u1,purchase,ipad", - "u2,purchase,nexus", - "u2,purchase,galaxy", - // remove one user so A'B will be of different dimensions - // ItemSimilarityDriver should create one unified user dictionary and so account for this - // discrepancy as a blank row: "u3,purchase,surface", - "u4,purchase,iphone", - "u4,purchase,galaxy", - "u1,view,iphone", - "u1,view,ipad", - "u1,view,nexus", - "u1,view,galaxy", - "u2,view,iphone", - "u2,view,ipad", - "u2,view,nexus", - "u2,view,galaxy", - "u3,view,surface", - "u3,view,nexus", - "u4,view,iphone", - "u4,view,ipad", - "u4,view,galaxy") - - val UnequalDimensionsSelfSimilarity = Iterable( - "ipad\tiphone:1.7260924347106847", - "iphone\tipad:1.7260924347106847", - "nexus\tgalaxy:1.7260924347106847", - "galaxy\tnexus:1.7260924347106847") - - val UnequalDimensionsCrossSimilarity = Iterable( - "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847", - "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", - "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", - "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847") + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + // remove one user so A'B will be of different dimensions + // ItemSimilarityDriver should create one unified user dictionary and so account for this + // discrepancy as a blank row: "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,iphone", + "u1,view,ipad", + "u1,view,nexus", + "u1,view,galaxy", + "u2,view,iphone", + "u2,view,ipad", + "u2,view,nexus", + "u2,view,galaxy", + "u3,view,surface", + "u3,view,nexus", + "u4,view,iphone", + "u4,view,ipad", + "u4,view,galaxy") + val UnequalDimensionsSelfSimilarity = tokenize(Iterable( + "ipad\tiphone:1.7260924347106847", + "iphone\tipad:1.7260924347106847", + "nexus\tgalaxy:1.7260924347106847", + "galaxy\tnexus:1.7260924347106847")) + + val UnequalDimensionsCrossSimilarity = tokenize(Iterable( + "galaxy\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 nexus:1.7260924347106847", + "iphone\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 surface:1.7260924347106847 nexus:1.7260924347106847", + "ipad\tgalaxy:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897", + "nexus\tiphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897 galaxy:0.6795961471815897")) // this will create multiple part-xxxxx files in the InFile dir but other tests will // take account of one actual file val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1) val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InFile1, - "--input2", InFile2, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", ",", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1", - "--dontAddMahoutJars")) - - beforeEach // restart the test context to read the output of the driver + "--input", InFile1, + "--input2", InFile2, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1")) val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs UnequalDimensionsSelfSimilarity - crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarity + tokenize(indicatorLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity + tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity } @@ -600,82 +564,56 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut val OutPath = TmpDir + "indicator-matrices/" val lines = Array( - "u1,purchase,iphone", - "u1,purchase,ipad", - "u2,purchase,nexus", - "u2,purchase,galaxy", - "u3,purchase,surface", - "u4,purchase,iphone", - "u4,purchase,galaxy", - "u1,view,phones", - "u1,view,mobile_acc", - "u2,view,phones", - "u2,view,tablets", - "u2,view,mobile_acc", - "u3,view,mobile_acc", - "u4,view,phones", - "u4,view,tablets", - "u4,view,soap") - - val UnequalDimensionsCrossSimilarityLines = Iterable( + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,phones", + "u1,view,mobile_acc", + "u2,view,phones", + "u2,view,tablets", + "u2,view,mobile_acc", + "u3,view,mobile_acc", + "u4,view,phones", + "u4,view,tablets", + "u4,view,soap") + + val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable( "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847", "surface\tmobile_acc:0.6795961471815897", "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897", "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847", - "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897") + "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897")) // this will create multiple part-xxxxx files in the InFile dir but other tests will // take account of one actual file val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1) val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InFile1, - "--input2", InFile2, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", ",", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1", - "--dontAddMahoutJars", - "--writeAllDatasets")) + "--input", InFile1, + "--input2", InFile2, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1", + "--writeAllDatasets")) - beforeEach // restart the test context to read the output of the driver val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs SelfSimilairtyLines - crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines + tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens + tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines } - // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable' - def tokenize(a: Iterable[String]): Iterable[String] = { - var r: Iterable[String] = Iterable() - a.foreach { l => - l.split("\t").foreach{ s => - r = r ++ s.split(",") - } - } - r.asInstanceOf[Iterable[String]] - } - - override def afterAll = { - removeTmpDir - super.afterAll - } - - def removeTmpDir = { - // remove TmpDir - val fs = FileSystem.get(new Configuration()) - fs.delete(new Path(TmpDir), true) // delete recursively - } - test("A.t %*% B after changing row cardinality of A"){ // todo: move to math tests but this is Spark specific @@ -720,57 +658,82 @@ removed ==> u3 0 0 1 0 val OutPath = TmpDir + "indicator-matrices/" val lines = Array( - "u1,purchase,iphone", - "u1,purchase,ipad", - "u2,purchase,nexus", - "u2,purchase,galaxy", - "u3,purchase,surface", - "u4,purchase,iphone", - "u4,purchase,galaxy", - "u1,view,phones", - "u1,view,mobile_acc", - "u2,view,phones", - "u2,view,tablets", - "u2,view,mobile_acc", - //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work - "u4,view,phones", - "u4,view,tablets", - "u4,view,soap") - - val UnequalDimensionsCrossSimilarityLines = Iterable( + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,phones", + "u1,view,mobile_acc", + "u2,view,phones", + "u2,view,tablets", + "u2,view,mobile_acc", + //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work + "u4,view,phones", + "u4,view,tablets", + "u4,view,soap") + + val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable( "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847", "ipad\tmobile_acc:1.7260924347106847 phones:0.6795961471815897", "surface", "nexus\tmobile_acc:1.7260924347106847 tablets:1.7260924347106847 phones:0.6795961471815897", - "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847") + "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847")) // this will create multiple part-xxxxx files in the InFile dir but other tests will // take account of one actual file val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1) val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2) - afterEach // clean up before running the driver, it should handle the Spark conf and context - // local multi-threaded Spark with default HDFS ItemSimilarityDriver.main(Array( - "--input", InFile1, - "--input2", InFile2, - "--output", OutPath, - "--master", masterUrl, - "--filter1", "purchase", - "--filter2", "view", - "--inDelim", ",", - "--itemIDPosition", "2", - "--rowIDPosition", "0", - "--filterPosition", "1", - "--dontAddMahoutJars", - "--writeAllDatasets")) + "--input", InFile1, + "--input2", InFile2, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1", + "--writeAllDatasets")) - beforeEach // restart the test context to read the output of the driver val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable - indicatorLines should contain theSameElementsAs SelfSimilairtyLines - crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines + tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens + tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines + } + + // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable' + def tokenize(a: Iterable[String]): Iterable[String] = { + var r: Iterable[String] = Iterable() + a.foreach { l => + l.split("\t").foreach{ s => + r = r ++ s.split("[\t ]") + } + } + r + } + + override protected def beforeAll(configMap: ConfigMap) { + super.beforeAll(configMap) + + // just in case there is one left over + val fs = FileSystem.get(new Configuration()) + fs.delete(new Path(TmpDir), true) // delete recursively + + ItemSimilarityDriver.useContext(mahoutCtx) // for testing use the test context + } + + override protected def afterEach() { + + val fs = FileSystem.get(new Configuration()) + fs.delete(new Path(TmpDir), true) // delete recursively + + super.afterEach() } } http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala deleted file mode 100644 index 12c9034..0000000 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.sparkbindings.blas - -import org.apache.mahout.sparkbindings.test.DistributedSparkSuite -import org.scalatest.FunSuite -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.sparkbindings._ -import org.apache.mahout.sparkbindings.drm._ -import RLikeOps._ -import org.apache.spark.SparkContext._ -import org.apache.mahout.math.drm.logical.OpABt - -/** Tests for AB' operator algorithms */ -class ABtSuite extends FunSuite with DistributedSparkSuite { - - test("ABt") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val inCoreB = dense((3, 4, 5), (5, 6, 7)) - val drmA = drmParallelize(m = inCoreA, numPartitions = 3) - val drmB = drmParallelize(m = inCoreB, numPartitions = 2) - - val op = new OpABt(drmA, drmB) - - val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) - - printf("AB' num partitions = %d.\n", drm.rdd.partitions.size) - - val inCoreMControl = inCoreA %*% inCoreB.t - val inCoreM = drm.collect - - assert((inCoreM - inCoreMControl).norm < 1E-5) - - println(inCoreM) - - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala deleted file mode 100644 index be65e32..0000000 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.sparkbindings.blas - -import org.scalatest.FunSuite -import org.apache.mahout.sparkbindings.test.DistributedSparkSuite -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import RLikeDrmOps._ -import org.apache.spark.SparkContext._ -import org.apache.mahout.math.drm.logical.OpAewB -import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark - -/** Elementwise matrix operation tests */ -class AewBSuite extends FunSuite with DistributedSparkSuite { - - test("A * B Hadamard") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9)) - val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7)) - val drmA = drmParallelize(m = inCoreA, numPartitions = 2) - val drmB = drmParallelize(m = inCoreB) - - val op = new OpAewB(drmA, drmB, "*") - - val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) - - val inCoreM = drmM.collect - val inCoreMControl = inCoreA * inCoreB - - assert((inCoreM - inCoreMControl).norm < 1E-10) - - } - - test("A + B Elementwise") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9)) - val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7)) - val drmA = drmParallelize(m = inCoreA, numPartitions = 2) - val drmB = drmParallelize(m = inCoreB) - - val op = new OpAewB(drmA, drmB, "+") - - val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) - - val inCoreM = drmM.collect - val inCoreMControl = inCoreA + inCoreB - - assert((inCoreM - inCoreMControl).norm < 1E-10) - - } - - test("A - B Elementwise") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9)) - val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7)) - val drmA = drmParallelize(m = inCoreA, numPartitions = 2) - val drmB = drmParallelize(m = inCoreB) - - val op = new OpAewB(drmA, drmB, "-") - - val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) - - val inCoreM = drmM.collect - val inCoreMControl = inCoreA - inCoreB - - assert((inCoreM - inCoreMControl).norm < 1E-10) - - } - - test("A / B Elementwise") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 0), (7, 8, 9)) - val inCoreB = dense((3, 4, 5), (5, 6, 7), (10, 20, 30), (9, 8, 7)) - val drmA = drmParallelize(m = inCoreA, numPartitions = 2) - val drmB = drmParallelize(m = inCoreB) - - val op = new OpAewB(drmA, drmB, "/") - - val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) - - val inCoreM = drmM.collect - val inCoreMControl = inCoreA / inCoreB - - assert((inCoreM - inCoreMControl).norm < 1E-10) - - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala deleted file mode 100644 index c31f27c..0000000 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.sparkbindings.blas - -import org.scalatest.FunSuite -import org.apache.mahout.sparkbindings.test.DistributedSparkSuite -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.sparkbindings._ -import org.apache.spark.SparkContext._ -import org.apache.mahout.math.drm.logical.OpAtA - -/** Tests for {@link XtX} */ -class AtASuite extends FunSuite with DistributedSparkSuite { - - test("AtA slim") { - - val inCoreA = dense((1, 2), (2, 3)) - val drmA = drmParallelize(inCoreA) - - val operator = new OpAtA[Int](A = drmA) - val inCoreAtA = AtA.at_a_slim(operator = operator, srcRdd = drmA.rdd) - println(inCoreAtA) - - val expectedAtA = inCoreA.t %*% inCoreA - println(expectedAtA) - - assert(expectedAtA === inCoreAtA) - - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala deleted file mode 100644 index 16632ec..0000000 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.sparkbindings.blas - -import org.scalatest.FunSuite -import org.apache.mahout.sparkbindings.test.DistributedSparkSuite -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.logical.OpAt -import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark - -/** Tests for A' algorithms */ -class AtSuite extends FunSuite with DistributedSparkSuite { - - test("At") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val drmA = drmParallelize(m = inCoreA, numPartitions = 2) - - val op = new OpAt(drmA) - val drmAt = new CheckpointedDrmSpark(rdd = At.at(op, srcA = drmA), _nrow = op.nrow, _ncol = op.ncol) - val inCoreAt = drmAt.collect - val inCoreControlAt = inCoreA.t - - println(inCoreAt) - assert((inCoreAt - inCoreControlAt).norm < 1E-5) - - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala new file mode 100644 index 0000000..1521cb8 --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.sparkbindings.blas + +import collection._ +import JavaConversions._ +import org.scalatest.FunSuite +import org.apache.mahout.test.DistributedMahoutSuite +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import org.apache.mahout.sparkbindings._ +import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark +import org.apache.mahout.math.drm.logical.{OpAt, OpAtA, OpAewB, OpABt} +import org.apache.mahout.sparkbindings.test.DistributedSparkSuite + +/** Collection of physical blas operator tests. */ +class BlasSuite extends FunSuite with DistributedSparkSuite { + + test("ABt") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) + val inCoreB = dense((3, 4, 5), (5, 6, 7)) + val drmA = drmParallelize(m = inCoreA, numPartitions = 3) + val drmB = drmParallelize(m = inCoreB, numPartitions = 2) + + val op = new OpABt(drmA, drmB) + + val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) + + printf("AB' num partitions = %d.\n", drm.rdd.partitions.size) + + val inCoreMControl = inCoreA %*% inCoreB.t + val inCoreM = drm.collect + + assert((inCoreM - inCoreMControl).norm < 1E-5) + + println(inCoreM) + } + + test("A * B Hadamard") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9)) + val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7)) + val drmA = drmParallelize(m = inCoreA, numPartitions = 2) + val drmB = drmParallelize(m = inCoreB) + + val op = new OpAewB(drmA, drmB, "*") + + val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) + + val inCoreM = drmM.collect + val inCoreMControl = inCoreA * inCoreB + + assert((inCoreM - inCoreMControl).norm < 1E-10) + + } + + test("A + B Elementwise") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9)) + val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7)) + val drmA = drmParallelize(m = inCoreA, numPartitions = 2) + val drmB = drmParallelize(m = inCoreB) + + val op = new OpAewB(drmA, drmB, "+") + + val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) + + val inCoreM = drmM.collect + val inCoreMControl = inCoreA + inCoreB + + assert((inCoreM - inCoreMControl).norm < 1E-10) + + } + + test("A - B Elementwise") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9)) + val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7)) + val drmA = drmParallelize(m = inCoreA, numPartitions = 2) + val drmB = drmParallelize(m = inCoreB) + + val op = new OpAewB(drmA, drmB, "-") + + val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) + + val inCoreM = drmM.collect + val inCoreMControl = inCoreA - inCoreB + + assert((inCoreM - inCoreMControl).norm < 1E-10) + + } + + test("A / B Elementwise") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 0), (7, 8, 9)) + val inCoreB = dense((3, 4, 5), (5, 6, 7), (10, 20, 30), (9, 8, 7)) + val drmA = drmParallelize(m = inCoreA, numPartitions = 2) + val drmB = drmParallelize(m = inCoreB) + + val op = new OpAewB(drmA, drmB, "/") + + val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) + + val inCoreM = drmM.collect + val inCoreMControl = inCoreA / inCoreB + + assert((inCoreM - inCoreMControl).norm < 1E-10) + + } + + test("AtA slim") { + + val inCoreA = dense((1, 2), (2, 3)) + val drmA = drmParallelize(inCoreA) + + val operator = new OpAtA[Int](A = drmA) + val inCoreAtA = AtA.at_a_slim(operator = operator, srcRdd = drmA.rdd) + println(inCoreAtA) + + val expectedAtA = inCoreA.t %*% inCoreA + println(expectedAtA) + + assert(expectedAtA === inCoreAtA) + + } + + test("At") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) + val drmA = drmParallelize(m = inCoreA, numPartitions = 2) + + val op = new OpAt(drmA) + val drmAt = new CheckpointedDrmSpark(rdd = At.at(op, srcA = drmA), _nrow = op.nrow, _ncol = op.ncol) + val inCoreAt = drmAt.collect + val inCoreControlAt = inCoreA.t + + println(inCoreAt) + assert((inCoreAt - inCoreControlAt).norm < 1E-5) + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala index a0136e0..29c8bea 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala @@ -17,7 +17,7 @@ package org.apache.mahout.sparkbindings.test -import org.scalatest.Suite +import org.scalatest.{ConfigMap, BeforeAndAfterAllConfigMap, Suite} import org.apache.spark.SparkConf import org.apache.mahout.sparkbindings._ import org.apache.mahout.test.{DistributedMahoutSuite, MahoutSuite} @@ -29,10 +29,8 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat protected implicit var mahoutCtx: DistributedContext = _ protected var masterUrl = null.asInstanceOf[String] - override protected def beforeEach() { - super.beforeEach() - - masterUrl = "local[2]" + protected def initContext() { + masterUrl = "local[3]" mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl, appName = "MahoutLocalContext", // Do not run MAHOUT_HOME jars in unit tests. @@ -44,7 +42,7 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat ) } - override protected def afterEach() { + protected def resetContext() { if (mahoutCtx != null) { try { mahoutCtx.close() @@ -52,6 +50,21 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat mahoutCtx = null } } + } + + override protected def beforeEach() { + super.beforeEach() +// initContext() + } + + + override protected def beforeAll(configMap: ConfigMap): Unit = { + super.beforeAll(configMap) + initContext() + } + + override protected def afterEach() { +// resetContext() super.afterEach() } } http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala index d5d16a8..e48e7c7 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala @@ -17,14 +17,14 @@ package org.apache.mahout.sparkbindings.test -import org.scalatest.Suite +import org.scalatest.{ConfigMap, Suite} import org.apache.log4j.{Level, Logger, BasicConfigurator} trait LoggerConfiguration extends org.apache.mahout.test.LoggerConfiguration { this: Suite => - override protected def beforeAll(): Unit = { - super.beforeAll() - Logger.getLogger("org.apache.mahout.sparkbindings").setLevel(Level.DEBUG) + override protected def beforeAll(configMap: ConfigMap) { + super.beforeAll(configMap) + Logger.getLogger("org.apache.mahout.sparkbindings").setLevel(Level.INFO) } }
