This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 660423d  [SPARK-23469][ML] HashingTF should use corrected MurmurHash3 
implementation
660423d is described below

commit 660423d71769a42176f7cbe867eb23ec8afe7592
Author: Huaxin Gao <huax...@us.ibm.com>
AuthorDate: Fri Aug 2 10:53:36 2019 -0500

    [SPARK-23469][ML] HashingTF should use corrected MurmurHash3 implementation
    
    ## What changes were proposed in this pull request?
    
    Update HashingTF to use new implementation of MurmurHash3
    Make HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded
    
    ## How was this patch tested?
    
    Change existing unit tests. Also add one unit test to make sure HashingTF 
use the old MurmurHash3 when a model from pre 3.0 is loaded
    
    Closes #25303 from huaxingao/spark-23469.
    
    Authored-by: Huaxin Gao <huax...@us.ibm.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../org/apache/spark/ml/feature/HashingTF.scala    |  28 +++++++++++++++++++--
 .../hashingTF-pre3.0/metadata/.part-00000.crc      | Bin 0 -> 12 bytes
 .../test-data/hashingTF-pre3.0/metadata/_SUCCESS   |   0
 .../test-data/hashingTF-pre3.0/metadata/part-00000 |   1 +
 .../apache/spark/ml/feature/HashingTFSuite.scala   |  25 +++++++++++++-----
 python/pyspark/ml/feature.py                       |   8 +++---
 python/pyspark/ml/tests/test_feature.py            |   2 +-
 7 files changed, 51 insertions(+), 13 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
index 27b8bdc..0e6c43a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
@@ -26,11 +26,12 @@ import org.apache.spark.ml.linalg.Vectors
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
 import org.apache.spark.ml.util._
-import org.apache.spark.mllib.feature.HashingTF.murmur3Hash
+import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF}
 import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.functions.{col, udf}
 import org.apache.spark.sql.types.{ArrayType, StructType}
 import org.apache.spark.util.Utils
+import org.apache.spark.util.VersionUtils.majorMinorVersion
 
 /**
  * Maps a sequence of terms to their term frequencies using the hashing trick.
@@ -44,7 +45,7 @@ import org.apache.spark.util.Utils
 class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String)
   extends Transformer with HasInputCol with HasOutputCol with 
DefaultParamsWritable {
 
-  private[this] val hashFunc: Any => Int = murmur3Hash
+  private var hashFunc: Any => Int = FeatureHasher.murmur3Hash
 
   @Since("1.2.0")
   def this() = this(Identifiable.randomUID("hashingTF"))
@@ -142,6 +143,29 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override 
val uid: String)
 @Since("1.6.0")
 object HashingTF extends DefaultParamsReadable[HashingTF] {
 
+  private class HashingTFReader extends MLReader[HashingTF] {
+
+    private val className = classOf[HashingTF].getName
+
+    override def load(path: String): HashingTF = {
+      val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+      val hashingTF = new HashingTF(metadata.uid)
+      metadata.getAndSetParams(hashingTF)
+
+      // We support loading old `HashingTF` saved by previous Spark versions.
+      // Previous `HashingTF` uses `mllib.feature.HashingTF.murmur3Hash`, but 
new `HashingTF` uses
+      // `ml.Feature.FeatureHasher.murmur3Hash`.
+      val (majorVersion, _) = majorMinorVersion(metadata.sparkVersion)
+      if (majorVersion < 3) {
+        hashingTF.hashFunc = OldHashingTF.murmur3Hash
+      }
+      hashingTF
+    }
+  }
+
+  @Since("3.0.0")
+  override def read: MLReader[HashingTF] = new HashingTFReader
+
   @Since("1.6.0")
   override def load(path: String): HashingTF = super.load(path)
 }
diff --git 
a/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/.part-00000.crc 
b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/.part-00000.crc
new file mode 100644
index 0000000..1ac377a
Binary files /dev/null and 
b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/.part-00000.crc 
differ
diff --git 
a/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/_SUCCESS 
b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/_SUCCESS
new file mode 100644
index 0000000..e69de29
diff --git 
a/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/part-00000 
b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/part-00000
new file mode 100644
index 0000000..492a07a
--- /dev/null
+++ b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/part-00000
@@ -0,0 +1 @@
+{"class":"org.apache.spark.ml.feature.HashingTF","timestamp":1564446310495,"sparkVersion":"2.3.0-SNAPSHOT","uid":"hashingTF_8ced2ab477c1","paramMap":{"binary":true,"numFeatures":100,"outputCol":"features","inputCol":"words"}}
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala
index d768a40..d65646e 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala
@@ -83,11 +83,24 @@ class HashingTFSuite extends MLTest with 
DefaultReadWriteTest {
       .setInputCol("words")
       .setOutputCol("features")
       .setNumFeatures(n)
-    val mLlibHashingTF = new MLlibHashingTF(n)
-    assert(hashingTF.indexOf("a") === mLlibHashingTF.indexOf("a"))
-    assert(hashingTF.indexOf("b") === mLlibHashingTF.indexOf("b"))
-    assert(hashingTF.indexOf("c") === mLlibHashingTF.indexOf("c"))
-    assert(hashingTF.indexOf("d") === mLlibHashingTF.indexOf("d"))
+    assert(hashingTF.indexOf("a") === 67)
+    assert(hashingTF.indexOf("b") === 65)
+    assert(hashingTF.indexOf("c") === 68)
+    assert(hashingTF.indexOf("d") === 90)
+  }
+
+  test("SPARK-23469: Load HashingTF prior to Spark 3.0") {
+    val hashingTFPath = testFile("test-data/hashingTF-pre3.0")
+    val loadedHashingTF = HashingTF.load(hashingTFPath)
+    val mLlibHashingTF = new MLlibHashingTF(100)
+    assert(loadedHashingTF.indexOf("a") === mLlibHashingTF.indexOf("a"))
+    assert(loadedHashingTF.indexOf("b") === mLlibHashingTF.indexOf("b"))
+    assert(loadedHashingTF.indexOf("c") === mLlibHashingTF.indexOf("c"))
+    assert(loadedHashingTF.indexOf("d") === mLlibHashingTF.indexOf("d"))
+
+    val metadata = spark.read.json(s"$hashingTFPath/metadata")
+    val sparkVersionStr = metadata.select("sparkVersion").first().getString(0)
+    assert(sparkVersionStr == "2.3.0-SNAPSHOT")
   }
 
   test("read/write") {
@@ -103,7 +116,7 @@ class HashingTFSuite extends MLTest with 
DefaultReadWriteTest {
 object HashingTFSuite {
 
   private[feature] def murmur3FeatureIdx(numFeatures: Int)(term: Any): Int = {
-    Utils.nonNegativeMod(MLlibHashingTF.murmur3Hash(term), numFeatures)
+    Utils.nonNegativeMod(FeatureHasher.murmur3Hash(term), numFeatures)
   }
 
 }
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 1196da5..fe8ac62 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -902,19 +902,19 @@ class HashingTF(JavaTransformer, HasInputCol, 
HasOutputCol, HasNumFeatures, Java
     >>> df = spark.createDataFrame([(["a", "b", "c"],)], ["words"])
     >>> hashingTF = HashingTF(numFeatures=10, inputCol="words", 
outputCol="features")
     >>> hashingTF.transform(df).head().features
-    SparseVector(10, {0: 1.0, 1: 1.0, 2: 1.0})
+    SparseVector(10, {5: 1.0, 7: 1.0, 8: 1.0})
     >>> hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
-    SparseVector(10, {0: 1.0, 1: 1.0, 2: 1.0})
+    SparseVector(10, {5: 1.0, 7: 1.0, 8: 1.0})
     >>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"}
     >>> hashingTF.transform(df, params).head().vector
-    SparseVector(5, {0: 1.0, 1: 1.0, 2: 1.0})
+    SparseVector(5, {0: 1.0, 2: 1.0, 3: 1.0})
     >>> hashingTFPath = temp_path + "/hashing-tf"
     >>> hashingTF.save(hashingTFPath)
     >>> loadedHashingTF = HashingTF.load(hashingTFPath)
     >>> loadedHashingTF.getNumFeatures() == hashingTF.getNumFeatures()
     True
     >>> hashingTF.indexOf("b")
-    1
+    5
 
     .. versionadded:: 1.3.0
     """
diff --git a/python/pyspark/ml/tests/test_feature.py 
b/python/pyspark/ml/tests/test_feature.py
index e2fc4e5..6b0d1dc 100644
--- a/python/pyspark/ml/tests/test_feature.py
+++ b/python/pyspark/ml/tests/test_feature.py
@@ -296,7 +296,7 @@ class HashingTFTest(SparkSessionTestCase):
         
hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True)
         output = hashingTF.transform(df)
         features = output.select("features").first().features.toArray()
-        expected = Vectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
0.0]).toArray()
+        expected = Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 
0.0]).toArray()
         for i in range(0, n):
             self.assertAlmostEqual(features[i], expected[i], 14, "Error at " + 
str(i) +
                                    ": expected " + str(expected[i]) + ", got " 
+ str(features[i]))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to