Repository: mahout Updated Branches: refs/heads/master c34e8a84c -> 61c9e1a92
MAHOUT-1615: fix up HDFS ClassTag reading from H2O bindings. this closes apache/mahout#58 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/61c9e1a9 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/61c9e1a9 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/61c9e1a9 Branch: refs/heads/master Commit: 61c9e1a92975de5891f082068fb708dc30cfafa1 Parents: c34e8a8 Author: Andrew Palumbo <[email protected]> Authored: Fri Oct 10 16:43:36 2014 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Fri Oct 10 16:43:36 2014 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 +- .../org/apache/mahout/common/DrmMetadata.scala | 56 +++++++++++++++++ .../org/apache/mahout/common/HDFSUtil.scala | 26 ++++++++ .../apache/mahout/common/Hadoop1HDFSUtil.scala | 65 ++++++++++++++++++++ .../apache/mahout/h2obindings/H2OEngine.scala | 11 +++- pom.xml | 2 +- 6 files changed, 158 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index bc1edcf..079d216 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,7 +2,7 @@ Mahout Change Log Release 1.0 - unreleased - MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for Text-Keyed SequenceFiles (dlyubimov & apalumbo) + MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for Text-Keyed SequenceFiles (Anand Avati, dlyubimov, apalumbo) MAHOUT-1610: Update tests to pass in Java 8 (srowen) http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala b/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala new file mode 100644 index 0000000..5bbccb1 --- /dev/null +++ b/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala @@ -0,0 +1,56 @@ +package org.apache.mahout.common + +import scala.reflect.ClassTag +import org.apache.hadoop.io._ +import java.util.Arrays + +class DrmMetadata( + + /** Writable key type as a sub-type of Writable */ + val keyTypeWritable: Class[_], + + /** Value writable type, as a sub-type of Writable */ + val valueTypeWritable: Class[_] + + ) { + + import DrmMetadata._ + + val ( + + /** Actual drm key class tag once converted out of writable */ + keyClassTag: ClassTag[_], + + /** Conversion from Writable to value type of the DRM key */ + keyW2ValFunc: ((Writable) => Any) + + ) = keyTypeWritable match { + case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _ + case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _ + case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _ + case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _ + case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _ + case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _ + case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _ + case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}") + } + +} + +object DrmMetadata { + + private[common] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get() + + private[common] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get() + + private[common] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get() + + private[common] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get() + + private[common] def w2string(w: Writable) = w.asInstanceOf[Text].toString() + + private[common] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get() + + private[common] def w2bytes(w: Writable) = Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(), + w.asInstanceOf[BytesWritable].getLength()) +} http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala new file mode 100644 index 0000000..f5f87d7 --- /dev/null +++ b/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common + +/** High level Hadoop version-specific hdfs manipulations we need in context of our operations. */ +trait HDFSUtil { + + /** Read DRM header information off (H)DFS. */ + def readDrmHeader(path:String):DrmMetadata +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala new file mode 100644 index 0000000..87977ff --- /dev/null +++ b/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common + +import org.apache.hadoop.io.{Writable, SequenceFile} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.conf.Configuration +import collection._ +import JavaConversions._ + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + */ +object Hadoop1HDFSUtil extends HDFSUtil { + + + def readDrmHeader(path: String): DrmMetadata = { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + val partFilePath:Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => (!s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir)} + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) + ) + } finally { + reader.close() + } + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 06125fe..28214c6 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -23,9 +23,13 @@ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.logical._ import org.apache.mahout.h2obindings.ops._ import org.apache.mahout.h2obindings.drm._ +import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil} /** H2O specific non-DRM operations */ object H2OEngine extends DistributedEngine { + // By default, use Hadoop 1 utils + var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = H2OHelper.colMeans(drm.h2odrm.frame) @@ -54,8 +58,11 @@ object H2OEngine extends DistributedEngine { * * @return DRM[Any] where Any is automatically translated to value type */ - def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] = - new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc) + def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] = { + val drmMetadata = hdfsUtils.readDrmHeader(path) + + new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + } /** This creates an empty DRM with specified number of partitions and cardinality. */ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Int] = http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3696eb5..89ed1a7 100644 --- a/pom.xml +++ b/pom.xml @@ -701,7 +701,7 @@ <module>math-scala</module> <module>spark</module> <module>spark-shell</module> - <!--module>h2o</module --> + <module>h2o</module> </modules> <profiles> <profile>
