This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0c38765 [SPARK-32974][ML] FeatureHasher transform optimization 0c38765 is described below commit 0c38765b297337c3d80496db09ae7f79d2acf778 Author: zhengruifeng <ruife...@foxmail.com> AuthorDate: Sun Sep 27 09:35:05 2020 +0800 [SPARK-32974][ML] FeatureHasher transform optimization ### What changes were proposed in this pull request? pre-compute the output indices of numerical columns, instead of computing them on each row. ### Why are the changes needed? for a numerical column, its output index is a hash of its `col_name`, we can pre-compute it at first, instead of computing it on each row. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuites Closes #29850 from zhengruifeng/hash_opt. Authored-by: zhengruifeng <ruife...@foxmail.com> Signed-off-by: zhengruifeng <ruife...@foxmail.com> --- .../apache/spark/ml/feature/FeatureHasher.scala | 66 +++++++++++++--------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index 3986255..0bb0b05 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -125,19 +125,24 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme @Since("2.3.0") override def transform(dataset: Dataset[_]): DataFrame = { + val outputSchema = transformSchema(dataset.schema) val hashFunc: Any => Int = FeatureHasher.murmur3Hash + val n = $(numFeatures) val localInputCols = $(inputCols) - val catCols = if (isSet(categoricalCols)) { - $(categoricalCols).toSet - } else { - Set[String]() + + var catCols = dataset.schema(localInputCols.toSet) + .filterNot(_.dataType.isInstanceOf[NumericType]).map(_.name).toArray + if (isSet(categoricalCols)) { + // categoricalCols may contain columns not set in inputCols + catCols = (catCols ++ $(categoricalCols).intersect(localInputCols)).distinct } + val catIndices = catCols.map(c => localInputCols.indexOf(c)) - val outputSchema = transformSchema(dataset.schema) - val realFields = outputSchema.fields.filter { f => - f.dataType.isInstanceOf[NumericType] && !catCols.contains(f.name) - }.map(_.name).toSet + val realCols = (localInputCols.toSet -- catCols).toArray + val realIndices = realCols.map(c => localInputCols.indexOf(c)) + // pre-compute output indices of real columns + val realOutputIndices = realCols.map(c => Utils.nonNegativeMod(hashFunc(c), n)) def getDouble(x: Any): Double = { x match { @@ -151,33 +156,38 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme val hashFeatures = udf { row: Row => val map = new OpenHashMap[Int, Double]() - localInputCols.foreach { colName => - val fieldIndex = row.fieldIndex(colName) - if (!row.isNullAt(fieldIndex)) { - val (rawIdx, value) = if (realFields(colName)) { - // numeric values are kept as is, with vector index based on hash of "column_name" - val value = getDouble(row.get(fieldIndex)) - val hash = hashFunc(colName) - (hash, value) - } else { - // string, boolean and numeric values that are in catCols are treated as categorical, - // with an indicator value of 1.0 and vector index based on hash of "column_name=value" - val value = row.get(fieldIndex).toString - val fieldName = s"$colName=$value" - val hash = hashFunc(fieldName) - (hash, 1.0) - } - val idx = Utils.nonNegativeMod(rawIdx, n) + + var i = 0 + while (i < realIndices.length) { + val realIdx = realIndices(i) + if (!row.isNullAt(realIdx)) { + // numeric values are kept as is, with vector index based on hash of "column_name" + val value = getDouble(row.get(realIdx)) + val idx = realOutputIndices(i) map.changeValue(idx, value, v => v + value) } + i += 1 } + + i = 0 + while (i < catIndices.length) { + val catIdx = catIndices(i) + if (!row.isNullAt(catIdx)) { + // string, boolean and numeric values that are in catCols are treated as categorical, + // with an indicator value of 1.0 and vector index based on hash of "column_name=value" + val string = row.get(catIdx).toString + val rawIdx = hashFunc(s"${catCols(i)}=$string") + val idx = Utils.nonNegativeMod(rawIdx, n) + map.changeValue(idx, 1.0, v => v + 1.0) + } + i += 1 + } + Vectors.sparse(n, map.toSeq) } val metadata = outputSchema($(outputCol)).metadata - dataset.select( - col("*"), - hashFeatures(struct($(inputCols).map(col): _*)).as($(outputCol), metadata)) + dataset.withColumn($(outputCol), hashFeatures(struct($(inputCols).map(col): _*)), metadata) } @Since("2.3.0") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org