Repository: mahout
Updated Branches:
  refs/heads/master c34e8a84c -> 61c9e1a92


MAHOUT-1615: fix up HDFS ClassTag reading from H2O bindings. this closes 
apache/mahout#58


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/61c9e1a9
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/61c9e1a9
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/61c9e1a9

Branch: refs/heads/master
Commit: 61c9e1a92975de5891f082068fb708dc30cfafa1
Parents: c34e8a8
Author: Andrew Palumbo <[email protected]>
Authored: Fri Oct 10 16:43:36 2014 -0400
Committer: Andrew Palumbo <[email protected]>
Committed: Fri Oct 10 16:43:36 2014 -0400

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +-
 .../org/apache/mahout/common/DrmMetadata.scala  | 56 +++++++++++++++++
 .../org/apache/mahout/common/HDFSUtil.scala     | 26 ++++++++
 .../apache/mahout/common/Hadoop1HDFSUtil.scala  | 65 ++++++++++++++++++++
 .../apache/mahout/h2obindings/H2OEngine.scala   | 11 +++-
 pom.xml                                         |  2 +-
 6 files changed, 158 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bc1edcf..079d216 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,7 +2,7 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
-  MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec 
Pairs for Text-Keyed SequenceFiles (dlyubimov & apalumbo)
+  MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec 
Pairs for Text-Keyed SequenceFiles (Anand Avati, dlyubimov, apalumbo)
 
   MAHOUT-1610: Update tests to pass in Java 8 (srowen)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala 
b/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
new file mode 100644
index 0000000..5bbccb1
--- /dev/null
+++ b/h2o/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
@@ -0,0 +1,56 @@
+package org.apache.mahout.common
+
+import scala.reflect.ClassTag
+import org.apache.hadoop.io._
+import java.util.Arrays
+
+class DrmMetadata(
+
+    /** Writable  key type as a sub-type of Writable */
+    val keyTypeWritable: Class[_],
+
+    /** Value writable type, as a sub-type of Writable */
+    val valueTypeWritable: Class[_]
+
+    ) {
+
+  import DrmMetadata._
+
+  val (
+
+      /** Actual drm key class tag once converted out of writable */
+      keyClassTag: ClassTag[_],
+
+      /** Conversion from Writable to value type of the DRM key */
+      keyW2ValFunc: ((Writable) => Any)
+
+      ) = keyTypeWritable match {
+    case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
+    case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
+    case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
+    case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
+    case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
+    case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) 
-> w2bool _
+    case cz if (cz == classOf[BytesWritable]) => 
ClassTag(classOf[Array[Byte]]) -> w2bytes _
+    case _ => throw new IllegalArgumentException(s"Unsupported DRM key 
type:${keyTypeWritable.getName}")
+  }
+
+}
+
+object DrmMetadata {
+
+  private[common] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get()
+
+  private[common] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get()
+
+  private[common] def w2double(w: Writable) = 
w.asInstanceOf[DoubleWritable].get()
+
+  private[common] def w2float(w: Writable) = 
w.asInstanceOf[FloatWritable].get()
+
+  private[common] def w2string(w: Writable) = w.asInstanceOf[Text].toString()
+
+  private[common] def w2bool(w: Writable) = 
w.asInstanceOf[BooleanWritable].get()
+
+  private[common] def w2bytes(w: Writable) = 
Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(),
+                                                           
w.asInstanceOf[BytesWritable].getLength())
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala 
b/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
new file mode 100644
index 0000000..f5f87d7
--- /dev/null
+++ b/h2o/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+/** High level Hadoop version-specific hdfs manipulations we need in context 
of our operations. */
+trait HDFSUtil {
+
+  /** Read DRM header information off (H)DFS. */
+  def readDrmHeader(path:String):DrmMetadata
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala 
b/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
new file mode 100644
index 0000000..87977ff
--- /dev/null
+++ b/h2o/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.io.{Writable, SequenceFile}
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+import collection._
+import JavaConversions._
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout 
dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop1HDFSUtil extends HDFSUtil {
+
+  
+  def readDrmHeader(path: String): DrmMetadata = {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    val partFilePath:Path = fs.listStatus(dfsPath)
+
+        // Filter out anything starting with .
+        .filter { s => (!s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDir)}
+
+        // Take path
+        .map(_.getPath)
+
+        // Take only one, if any
+        .headOption
+
+        // Require there's at least one partition file found.
+        .getOrElse {
+      throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
+    }
+
+    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+      )
+    } finally {
+      reader.close()
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 06125fe..28214c6 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -23,9 +23,13 @@ import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.h2obindings.ops._
 import org.apache.mahout.h2obindings.drm._
+import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
 
 /** H2O specific non-DRM operations */
 object H2OEngine extends DistributedEngine {
+  // By default, use Hadoop 1 utils
+  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+
   def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
     H2OHelper.colMeans(drm.h2odrm.frame)
 
@@ -54,8 +58,11 @@ object H2OEngine extends DistributedEngine {
     *
     *  @return DRM[Any] where Any is automatically translated to value type
     */
-  def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: 
DistributedContext): CheckpointedDrm[_] =
-    new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc)
+  def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: 
DistributedContext): CheckpointedDrm[_] = {
+    val drmMetadata = hdfsUtils.readDrmHeader(path)
+
+    new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), 
dc)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+  }
 
   /** This creates an empty DRM with specified number of partitions and 
cardinality. */
   def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int)(implicit 
dc: DistributedContext): CheckpointedDrm[Int] =

http://git-wip-us.apache.org/repos/asf/mahout/blob/61c9e1a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3696eb5..89ed1a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -701,7 +701,7 @@
     <module>math-scala</module>
     <module>spark</module>
     <module>spark-shell</module>
-    <!--module>h2o</module -->
+    <module>h2o</module>
   </modules>
   <profiles>
     <profile>

Reply via email to