Repository: mahout Updated Branches: refs/heads/master 3c88f5043 -> c34e8a84c
MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for Text-Keyed SequenceFiles. This closes apache/mahout#52 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c34e8a84 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c34e8a84 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c34e8a84 Branch: refs/heads/master Commit: c34e8a84c39b7bd2175cdbcb8583d487548a34b4 Parents: 3c88f50 Author: Andrew Palumbo <[email protected]> Authored: Thu Oct 9 16:16:14 2014 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Thu Oct 9 16:16:14 2014 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/mahout/h2obindings/H2OEngine.scala | 2 +- .../h2obindings/drm/CheckpointedDrmH2O.scala | 4 +- .../mahout/math/drm/CheckpointedDrm.scala | 10 ++- .../mahout/math/drm/DistributedEngine.scala | 2 +- .../org/apache/mahout/math/drm/DrmLike.scala | 2 + .../org/apache/mahout/math/drm/package.scala | 2 +- .../mahout/math/drm/DrmLikeSuiteBase.scala | 8 ++- pom.xml | 2 +- .../sparkbindings/shell/MahoutSparkILoop.scala | 3 + .../org/apache/mahout/common/DrmMetadata.scala | 56 +++++++++++++++++ .../org/apache/mahout/common/HDFSUtil.scala | 26 ++++++++ .../apache/mahout/common/Hadoop1HDFSUtil.scala | 65 ++++++++++++++++++++ .../mahout/sparkbindings/SparkEngine.scala | 57 ++++++----------- .../drm/CheckpointedDrmSpark.scala | 5 +- 15 files changed, 198 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index e650428..bc1edcf 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 1.0 - unreleased + MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for Text-Keyed SequenceFiles (dlyubimov & apalumbo) + MAHOUT-1610: Update tests to pass in Java 8 (srowen) MAHOUT-1608: Add option in WikipediaToSequenceFile to remove category labels from documents (apalumbo) http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 54d950b..06125fe 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -54,7 +54,7 @@ object H2OEngine extends DistributedEngine { * * @return DRM[Any] where Any is automatically translated to value type */ - def drmFromHDFS(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] = + def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] = new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc) /** This creates an empty DRM with specified number of partitions and cardinality. */ http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala index 15af5e2..c06455a 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala @@ -29,6 +29,8 @@ class CheckpointedDrmH2O[K: ClassTag]( */ def collect: Matrix = H2OHelper.matrixFromDrm(h2odrm) + /** Explicit extraction of key class Tag */ + def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]] /* XXX: call frame.remove */ def uncache(): this.type = this @@ -36,7 +38,7 @@ class CheckpointedDrmH2O[K: ClassTag]( /** * Persist DRM to on-disk over HDFS in Mahout DRM format. */ - def writeDRM(path: String): Unit = H2OHdfs.drmToFile(path, h2odrm) + def dfsWrite(path: String): Unit = H2OHdfs.drmToFile(path, h2odrm) /** * Action operator - Eagerly evaluate the lazily built operator graph to create http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 28fb7fd..082e5b9 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -18,6 +18,7 @@ package org.apache.mahout.math.drm import org.apache.mahout.math.Matrix +import scala.reflect.ClassTag /** * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be @@ -28,9 +29,16 @@ trait CheckpointedDrm[K] extends DrmLike[K] { def collect: Matrix - def writeDRM(path: String) + def dfsWrite(path: String) /** If this checkpoint is already declared cached, uncache. */ def uncache(): this.type + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + def keyClassTag: ClassTag[K] + + } http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index d89cc53..eaf5aeb 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -68,7 +68,7 @@ trait DistributedEngine { * @param path The DFS path to load from * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). */ - def drmFromHDFS(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] + def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala index 97fe989..b9c50b0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala @@ -17,6 +17,8 @@ package org.apache.mahout.math.drm +import scala.reflect.ClassTag + /** * http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index 02e8b7a..b787ec0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -49,7 +49,7 @@ package object drm { def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v) /** Load DRM from hdfs (as in Mahout DRM format) */ - def drmFromHDFS (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmFromHDFS(path) + def drmDfsRead (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmDfsRead(path) /** Shortcut to parallelizing matrices with indices, ignore row labels. */ def drmParallelize(m: Matrix, numPartitions: Int = 1) http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala index 7a13124..6c9313c 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala @@ -23,6 +23,7 @@ import org.apache.mahout.math._ import scalabindings._ import RLikeOps._ import RLikeDrmOps._ +import scala.reflect.ClassTag /** Common DRM tests to be run by all distributed engines. */ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers { @@ -35,12 +36,15 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers { val inCoreA = dense((1, 2, 3), (3, 4, 5)) val drmA = drmParallelize(inCoreA) - drmA.writeDRM(path = uploadPath) + drmA.dfsWrite(path = uploadPath) println(inCoreA) // Load back from hdfs - val drmB = drmFromHDFS(path = uploadPath) + val drmB = drmDfsRead(path = uploadPath) + + // Make sure keys are correctly identified as ints + drmB.checkpoint(CacheHint.NONE).keyClassTag shouldBe ClassTag.Int // Collect back into in-core val inCoreB = drmB.collect http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 89ed1a7..3696eb5 100644 --- a/pom.xml +++ b/pom.xml @@ -701,7 +701,7 @@ <module>math-scala</module> <module>spark</module> <module>spark-shell</module> - <module>h2o</module> + <!--module>h2o</module --> </modules> <profiles> <profile> http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala ---------------------------------------------------------------------- diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 0df42a3..107fb1e 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -45,6 +45,9 @@ class MahoutSparkILoop extends SparkILoop { conf.set("spark.executor.uri", execUri) } + // TODO:XXX remove this beforre pushing to apache/master + conf.set("spark.kryoserializer.buffer.mb", "100") + sparkContext = mahoutSparkContext( masterUrl = master, appName = "Mahout Spark Shell", http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala b/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala new file mode 100644 index 0000000..5bbccb1 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala @@ -0,0 +1,56 @@ +package org.apache.mahout.common + +import scala.reflect.ClassTag +import org.apache.hadoop.io._ +import java.util.Arrays + +class DrmMetadata( + + /** Writable key type as a sub-type of Writable */ + val keyTypeWritable: Class[_], + + /** Value writable type, as a sub-type of Writable */ + val valueTypeWritable: Class[_] + + ) { + + import DrmMetadata._ + + val ( + + /** Actual drm key class tag once converted out of writable */ + keyClassTag: ClassTag[_], + + /** Conversion from Writable to value type of the DRM key */ + keyW2ValFunc: ((Writable) => Any) + + ) = keyTypeWritable match { + case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _ + case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _ + case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _ + case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _ + case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _ + case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _ + case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _ + case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}") + } + +} + +object DrmMetadata { + + private[common] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get() + + private[common] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get() + + private[common] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get() + + private[common] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get() + + private[common] def w2string(w: Writable) = w.asInstanceOf[Text].toString() + + private[common] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get() + + private[common] def w2bytes(w: Writable) = Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(), + w.asInstanceOf[BytesWritable].getLength()) +} http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala new file mode 100644 index 0000000..f5f87d7 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common + +/** High level Hadoop version-specific hdfs manipulations we need in context of our operations. */ +trait HDFSUtil { + + /** Read DRM header information off (H)DFS. */ + def readDrmHeader(path:String):DrmMetadata +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala new file mode 100644 index 0000000..87977ff --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common + +import org.apache.hadoop.io.{Writable, SequenceFile} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.conf.Configuration +import collection._ +import JavaConversions._ + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + */ +object Hadoop1HDFSUtil extends HDFSUtil { + + + def readDrmHeader(path: String): DrmMetadata = { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + val partFilePath:Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => (!s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir)} + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) + ) + } finally { + reader.close() + } + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 54f33ef..08b2c34 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -17,7 +17,10 @@ package org.apache.mahout.sparkbindings +import java.io.IOException + import org.apache.mahout.math._ +import org.apache.spark.deploy.SparkHadoopUtil import scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm.logical._ @@ -26,17 +29,20 @@ import org.apache.mahout.math._ import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel import org.apache.mahout.sparkbindings.blas._ -import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} +import org.apache.hadoop.io._ import scala.Some import scala.collection.JavaConversions._ -import org.apache.spark.SparkContext import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.spark.rdd.RDD +import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil} /** Spark-specific non-drm-method operations */ object SparkEngine extends DistributedEngine { + // By default, use Hadoop 1 utils + var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol @@ -125,47 +131,20 @@ object SparkEngine extends DistributedEngine { * * @return DRM[Any] where Any is automatically translated to value type */ - def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { - - val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin) - // Get rid of VectorWritable - .map(t => (t._1, t._2.get())) - - def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = implicitly[ClassTag[K]] - - // Spark should've loaded the type info from the header, right? - val keyTag = getKeyClassTag(rdd) - - val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match { + def drmDfsRead (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[IntWritable]]) => ( - (v: AnyRef) => v.asInstanceOf[IntWritable].get, - (x: Any) => new IntWritable(x.asInstanceOf[Int]), - implicitly[ClassTag[Int]]) + val drmMetadata = hdfsUtils.readDrmHeader(path) + val k2vFunc = drmMetadata.keyW2ValFunc - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => ( - (v: AnyRef) => v.asInstanceOf[Text].toString, - (x: Any) => new Text(x.toString), - implicitly[ClassTag[String]]) - - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[LongWritable]]) => ( - (v: AnyRef) => v.asInstanceOf[LongWritable].get, - (x: Any) => new LongWritable(x.asInstanceOf[Int]), - implicitly[ClassTag[Long]]) - - case xx: ClassTag[Writable] => ( - (v: AnyRef) => v, - (x: Any) => x.asInstanceOf[Writable], - ClassTag(classOf[Writable])) - } - - { - implicit def getWritable(x: Any): Writable = val2keyFunc() + // Load RDD and convert all Writables to value types right away (due to reuse of writables in + // Hadoop we must do it right after read operation). + val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin) - val drmRdd = rdd.map { t => (key2valFunc(t._1), t._2)} + // Immediately convert keys and value writables into value types. + .map { case (wKey, wVec) => k2vFunc(wKey) -> wVec.get()} - drmWrap(rdd = drmRdd, cacheHint = CacheHint.MEMORY_ONLY)(unwrappedKeyTag.asInstanceOf[ClassTag[Any]]) - } + // Wrap into a DRM type with correct matrix row key class tag evident. + drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]]) } /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index cc5ebf2..b753f6f 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -65,6 +65,9 @@ class CheckpointedDrmSpark[K: ClassTag]( private var cached: Boolean = false override val context: DistributedContext = rdd.context + /** Explicit extraction of key class Tag */ + def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]] + /** * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer * and writing down Spark graph lineage since last checkpointed DRM. @@ -152,7 +155,7 @@ class CheckpointedDrmSpark[K: ClassTag]( * Dump matrix as computed Mahout's DRM into specified (HD)FS path * @param path */ - def writeDRM(path: String) = { + def dfsWrite(path: String) = { val ktag = implicitly[ClassTag[K]] implicit val k2wFunc: (K) => Writable =
