This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c38765  [SPARK-32974][ML] FeatureHasher transform optimization
0c38765 is described below

commit 0c38765b297337c3d80496db09ae7f79d2acf778
Author: zhengruifeng <ruife...@foxmail.com>
AuthorDate: Sun Sep 27 09:35:05 2020 +0800

    [SPARK-32974][ML] FeatureHasher transform optimization
    
    ### What changes were proposed in this pull request?
    pre-compute the output indices of numerical columns, instead of computing 
them on each row.
    
    ### Why are the changes needed?
    for a numerical column, its output index is a hash of its `col_name`, we 
can pre-compute it at first, instead of computing it on each row.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existing testsuites
    
    Closes #29850 from zhengruifeng/hash_opt.
    
    Authored-by: zhengruifeng <ruife...@foxmail.com>
    Signed-off-by: zhengruifeng <ruife...@foxmail.com>
---
 .../apache/spark/ml/feature/FeatureHasher.scala    | 66 +++++++++++++---------
 1 file changed, 38 insertions(+), 28 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
index 3986255..0bb0b05 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
@@ -125,19 +125,24 @@ class FeatureHasher(@Since("2.3.0") override val uid: 
String) extends Transforme
 
   @Since("2.3.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
+    val outputSchema = transformSchema(dataset.schema)
     val hashFunc: Any => Int = FeatureHasher.murmur3Hash
+
     val n = $(numFeatures)
     val localInputCols = $(inputCols)
-    val catCols = if (isSet(categoricalCols)) {
-      $(categoricalCols).toSet
-    } else {
-      Set[String]()
+
+    var catCols = dataset.schema(localInputCols.toSet)
+      .filterNot(_.dataType.isInstanceOf[NumericType]).map(_.name).toArray
+    if (isSet(categoricalCols)) {
+      // categoricalCols may contain columns not set in inputCols
+      catCols = (catCols ++ 
$(categoricalCols).intersect(localInputCols)).distinct
     }
+    val catIndices = catCols.map(c => localInputCols.indexOf(c))
 
-    val outputSchema = transformSchema(dataset.schema)
-    val realFields = outputSchema.fields.filter { f =>
-      f.dataType.isInstanceOf[NumericType] && !catCols.contains(f.name)
-    }.map(_.name).toSet
+    val realCols = (localInputCols.toSet -- catCols).toArray
+    val realIndices = realCols.map(c => localInputCols.indexOf(c))
+    // pre-compute output indices of real columns
+    val realOutputIndices = realCols.map(c => 
Utils.nonNegativeMod(hashFunc(c), n))
 
     def getDouble(x: Any): Double = {
       x match {
@@ -151,33 +156,38 @@ class FeatureHasher(@Since("2.3.0") override val uid: 
String) extends Transforme
 
     val hashFeatures = udf { row: Row =>
       val map = new OpenHashMap[Int, Double]()
-      localInputCols.foreach { colName =>
-        val fieldIndex = row.fieldIndex(colName)
-        if (!row.isNullAt(fieldIndex)) {
-          val (rawIdx, value) = if (realFields(colName)) {
-            // numeric values are kept as is, with vector index based on hash 
of "column_name"
-            val value = getDouble(row.get(fieldIndex))
-            val hash = hashFunc(colName)
-            (hash, value)
-          } else {
-            // string, boolean and numeric values that are in catCols are 
treated as categorical,
-            // with an indicator value of 1.0 and vector index based on hash 
of "column_name=value"
-            val value = row.get(fieldIndex).toString
-            val fieldName = s"$colName=$value"
-            val hash = hashFunc(fieldName)
-            (hash, 1.0)
-          }
-          val idx = Utils.nonNegativeMod(rawIdx, n)
+
+      var i = 0
+      while (i < realIndices.length) {
+        val realIdx = realIndices(i)
+        if (!row.isNullAt(realIdx)) {
+          // numeric values are kept as is, with vector index based on hash of 
"column_name"
+          val value = getDouble(row.get(realIdx))
+          val idx = realOutputIndices(i)
           map.changeValue(idx, value, v => v + value)
         }
+        i += 1
       }
+
+      i = 0
+      while (i < catIndices.length) {
+        val catIdx = catIndices(i)
+        if (!row.isNullAt(catIdx)) {
+          // string, boolean and numeric values that are in catCols are 
treated as categorical,
+          // with an indicator value of 1.0 and vector index based on hash of 
"column_name=value"
+          val string = row.get(catIdx).toString
+          val rawIdx = hashFunc(s"${catCols(i)}=$string")
+          val idx = Utils.nonNegativeMod(rawIdx, n)
+          map.changeValue(idx, 1.0, v => v + 1.0)
+        }
+        i += 1
+      }
+
       Vectors.sparse(n, map.toSeq)
     }
 
     val metadata = outputSchema($(outputCol)).metadata
-    dataset.select(
-      col("*"),
-      hashFeatures(struct($(inputCols).map(col): _*)).as($(outputCol), 
metadata))
+    dataset.withColumn($(outputCol), 
hashFeatures(struct($(inputCols).map(col): _*)), metadata)
   }
 
   @Since("2.3.0")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to