Repository: mahout Updated Branches: refs/heads/master a70f48537 -> a6c8346dd
(NOJIRA) fixing picking up mahout jars for distributed set up (Note: how was it even working before moment, and when did it go broken moment) Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a6c8346d Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a6c8346d Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a6c8346d Branch: refs/heads/master Commit: a6c8346ddb82390e2f5b5bd8c5968c436264e8a6 Parents: a70f485 Author: Dmitriy Lyubimov <[email protected]> Authored: Thu Aug 14 19:30:44 2014 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Thu Aug 14 19:30:44 2014 -0700 ---------------------------------------------------------------------- bin/mahout | 4 + .../mahout/math/drm/DrmLikeSuiteBase.scala | 13 +- .../apache/mahout/sparkbindings/package.scala | 126 ++++++++++--------- .../sparkbindings/SparkBindingsSuite.scala | 34 +++++ 4 files changed, 116 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/bin/mahout ---------------------------------------------------------------------- diff --git a/bin/mahout b/bin/mahout index 0174b31..fafd5c3 100755 --- a/bin/mahout +++ b/bin/mahout @@ -173,6 +173,10 @@ then CLASSPATH=${CLASSPATH}:$f; done + for f in $MAHOUT_HOME/math/target/mahout-math-*.jar ; do + CLASSPATH=${CLASSPATH}:$f; + done + for f in $MAHOUT_HOME/spark/target/mahout-spark_*.jar ; do CLASSPATH=${CLASSPATH}:$f; done http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala index 80fb285..7a13124 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala @@ -48,6 +48,7 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers { // Print out to see what it is we collected: println(inCoreB) + (inCoreA - inCoreB).norm should be < 1e-7 } test("DRM parallelizeEmpty") { @@ -57,13 +58,15 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers { // collect back into in-core val inCoreEmpty = drmEmpty.collect - //print out to see what it is we collected: - println(inCoreEmpty) - printf("drm nrow:%d, ncol:%d\n", drmEmpty.nrow, drmEmpty.ncol) - printf("in core nrow:%d, ncol:%d\n", inCoreEmpty.nrow, inCoreEmpty.ncol) + inCoreEmpty.sum.abs should be < 1e-7 + drmEmpty.nrow shouldBe 100 + drmEmpty.ncol shouldBe 50 + inCoreEmpty.nrow shouldBe 100 + inCoreEmpty.ncol shouldBe 50 + - } + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 6639a34..311cf82 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -59,69 +59,15 @@ package object sparkbindings { sparkConf: SparkConf = new SparkConf(), addMahoutJars: Boolean = true ): SparkDistributedContext = { + val closeables = new java.util.ArrayDeque[Closeable]() try { if (addMahoutJars) { - var mhome = System.getenv("MAHOUT_HOME") - if (mhome == null) mhome = System.getProperty("mahout.home") - - if (mhome == null) - throw new IllegalArgumentException("MAHOUT_HOME is required to spawn mahout-based spark jobs.") - - // Figure Mahout classpath using $MAHOUT_HOME/mahout classpath command. - - val fmhome = new File(mhome) - val bin = new File(fmhome, "bin") - val exec = new File(bin, "mahout") - if (!exec.canExecute) - throw new IllegalArgumentException("Cannot execute %s.".format(exec.getAbsolutePath)) - - val p = Runtime.getRuntime.exec(Array(exec.getAbsolutePath, "-spark", "classpath")) - - closeables.addFirst(new Closeable { - def close() { - p.destroy() - } - }) - - val r = new BufferedReader(new InputStreamReader(p.getInputStream)) - closeables.addFirst(r) - - val w = new StringWriter() - closeables.addFirst(w) - - var continue = true; - val jars = new ArrayBuffer[String]() - do { - val cp = r.readLine() - if (cp == null) - throw new IllegalArgumentException( - "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?" - ) - - val j = cp.split(File.pathSeparatorChar) - if (j.size > 10) { - // assume this is a valid classpath line - jars ++= j - continue = false - } - } while (continue) - - // if (s_log.isDebugEnabled) { - // s_log.debug("Mahout jars:") - // jars.foreach(j => s_log.debug(j)) - // } // context specific jars - val mcjars = jars.filter(j => - j.matches(".*mahout-math-.*\\.jar") || - j.matches(".*mahout-math-scala-.*\\.jar") || - j.matches(".*mahout-mrlegacy-.*\\.jar") || - j.matches(".*mahout-spark-.*\\.jar") - ).filter(n => !n.matches(".*-tests.jar") && !n.matches(".*-sources.jar")) ++ - SparkContext.jarOfClass(classOf[DrmLike[_]]) + val mcjars = findMahoutContextJars(closeables) if (log.isDebugEnabled) { log.debug("Mahout jars:") @@ -208,5 +154,73 @@ package object sparkbindings { _canHaveMissingRows = canHaveMissingRows ) + /** Acquire proper Mahout jars to be added to task context based on current MAHOUT_HOME. */ + private[sparkbindings] def findMahoutContextJars(closeables:java.util.Deque[Closeable]) = { + var mhome = System.getenv("MAHOUT_HOME") + if (mhome == null) mhome = System.getProperty("mahout.home") + + if (mhome == null) + throw new IllegalArgumentException("MAHOUT_HOME is required to spawn mahout-based spark jobs.") + + // Figure Mahout classpath using $MAHOUT_HOME/mahout classpath command. + + val fmhome = new File(mhome) + val bin = new File(fmhome, "bin") + val exec = new File(bin, "mahout") + if (!exec.canExecute) + throw new IllegalArgumentException("Cannot execute %s.".format(exec.getAbsolutePath)) + + val p = Runtime.getRuntime.exec(Array(exec.getAbsolutePath, "-spark", "classpath")) + + closeables.addFirst(new Closeable { + def close() { + p.destroy() + } + }) + + val r = new BufferedReader(new InputStreamReader(p.getInputStream)) + closeables.addFirst(r) + + val w = new StringWriter() + closeables.addFirst(w) + + var continue = true; + val jars = new ArrayBuffer[String]() + do { + val cp = r.readLine() + if (cp == null) + throw new IllegalArgumentException( + "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?" + ) + + val j = cp.split(File.pathSeparatorChar) + if (j.size > 10) { + // assume this is a valid classpath line + jars ++= j + continue = false + } + } while (continue) + +// jars.foreach(j => log.info(j)) + + // context specific jars + val mcjars = jars.filter(j => + j.matches(".*mahout-math-\\d.*\\.jar") || + j.matches(".*mahout-math-scala_\\d.*\\.jar") || + j.matches(".*mahout-mrlegacy-\\d.*\\.jar") || + j.matches(".*mahout-spark_\\d.*\\.jar") + ) + // Tune out "bad" classifiers + .filter(n => + !n.matches(".*-tests.jar") && + !n.matches(".*-sources.jar") && + !n.matches(".*-job.jar") && + // During maven tests, the maven classpath also creeps in for some reason + !n.matches(".*/.m2/.*") + ) + + mcjars + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala new file mode 100644 index 0000000..b5974bd --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala @@ -0,0 +1,34 @@ +package org.apache.mahout.sparkbindings + +import org.scalatest.FunSuite +import java.util +import java.io.{File, Closeable} +import org.apache.mahout.common.IOUtils +import org.apache.mahout.sparkbindings.test.DistributedSparkSuite + +/** + * @author dmitriy + */ +class SparkBindingsSuite extends FunSuite with DistributedSparkSuite { + + // This test will succeed only when MAHOUT_HOME is set in the environment. So we keep it for + // diagnorstic purposes around, but we probably don't want it to run in the Jenkins, so we'd + // let it to be ignored. + test("context jars") { + System.setProperty("mahout.home", new File("..").getAbsolutePath/*"/home/dmitriy/projects/github/mahout-commits"*/) + val closeables = new util.ArrayDeque[Closeable]() + try { + val mahoutJars = findMahoutContextJars(closeables) + mahoutJars.foreach { + println(_) + } + + mahoutJars.size should be > 0 + mahoutJars.size shouldBe 4 + } finally { + IOUtils.close(closeables) + } + + } + +}
