Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b75542603 -> f277cdf78


[SPARK-15945][MLLIB] Conversion between old/new vector columns in a DataFrame 
(Scala/Java)

## What changes were proposed in this pull request?

This PR provides conversion utils between old/new vector columns in a 
DataFrame. So users can use it to migrate their datasets and pipelines 
manually. The methods are implemented under `MLUtils` and called 
`convertVectorColumnsToML` and `convertVectorColumnsFromML`. Both take a 
DataFrame and a list of vector columns to be converted. It is a no-op on vector 
columns that are already converted. A warning message is logged if actual 
conversion happens.

This is the first sub-task under SPARK-15944 to make it easier to migrate 
existing pipelines to Spark 2.0.

## How was this patch tested?

Unit tests in Scala and Java.

cc: yanboliang

Author: Xiangrui Meng <m...@databricks.com>

Closes #13662 from mengxr/SPARK-15945.

(cherry picked from commit 63e0aebe22ba41c636ecaddd8647721d7690a1ec)
Signed-off-by: Yanbo Liang <yblia...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f277cdf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f277cdf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f277cdf7

Branch: refs/heads/branch-2.0
Commit: f277cdf787de4402cd6cdba5e15e38bb71d8c5c7
Parents: b755426
Author: Xiangrui Meng <m...@databricks.com>
Authored: Tue Jun 14 18:57:45 2016 -0700
Committer: Yanbo Liang <yblia...@gmail.com>
Committed: Tue Jun 14 18:58:02 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/util/MLUtils.scala   | 117 ++++++++++++++++++-
 .../spark/mllib/util/JavaMLUtilsSuite.java      |  49 ++++++++
 .../apache/spark/mllib/util/MLUtilsSuite.scala  |  60 +++++++++-
 3 files changed, 218 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f277cdf7/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index f0346e6..7d5bdff 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -17,14 +17,19 @@
 
 package org.apache.spark.mllib.util
 
+import scala.annotation.varargs
 import scala.reflect.ClassTag
 
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Since
-import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.linalg.{VectorUDT => MLVectorUDT}
+import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.linalg.BLAS.dot
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.functions.{col, udf}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.random.BernoulliCellSampler
 
@@ -32,7 +37,7 @@ import org.apache.spark.util.random.BernoulliCellSampler
  * Helper methods to load, save and pre-process data used in ML Lib.
  */
 @Since("0.8.0")
-object MLUtils {
+object MLUtils extends Logging {
 
   private[mllib] lazy val EPSILON = {
     var eps = 1.0
@@ -50,7 +55,6 @@ object MLUtils {
    * where the indices are one-based and in ascending order.
    * This method parses each line into a 
[[org.apache.spark.mllib.regression.LabeledPoint]],
    * where the feature indices are converted to zero-based.
-   *
    * @param sc Spark context
    * @param path file or directory path in any Hadoop-supported file system URI
    * @param numFeatures number of features, which will be determined from the 
input data if a
@@ -145,7 +149,6 @@ object MLUtils {
    * Save labeled data in LIBSVM format.
    * @param data an RDD of LabeledPoint to be saved
    * @param dir directory to save the data
-   *
    * @see [[org.apache.spark.mllib.util.MLUtils#loadLibSVMFile]]
    */
   @Since("1.0.0")
@@ -254,6 +257,110 @@ object MLUtils {
   }
 
   /**
+   * Converts vector columns in an input Dataset from the 
[[org.apache.spark.mllib.linalg.Vector]]
+   * type to the new [[org.apache.spark.ml.linalg.Vector]] type under the 
`spark.ml` package.
+   * @param dataset input dataset
+   * @param cols a list of vector columns to be converted. New vector columns 
will be ignored. If
+   *             unspecified, all old vector columns will be converted except 
nested ones.
+   * @return the input [[DataFrame]] with old vector columns converted to the 
new vector type
+   */
+  @Since("2.0.0")
+  @varargs
+  def convertVectorColumnsToML(dataset: Dataset[_], cols: String*): DataFrame 
= {
+    val schema = dataset.schema
+    val colSet = if (cols.nonEmpty) {
+      cols.flatMap { c =>
+        val dataType = schema(c).dataType
+        if (dataType.getClass == classOf[VectorUDT]) {
+          Some(c)
+        } else {
+          // ignore new vector columns and raise an exception on other column 
types
+          require(dataType.getClass == classOf[MLVectorUDT],
+            s"Column $c must be old Vector type to be converted to new type 
but got $dataType.")
+          None
+        }
+      }.toSet
+    } else {
+      schema.fields
+        .filter(_.dataType.getClass == classOf[VectorUDT])
+        .map(_.name)
+        .toSet
+    }
+
+    if (colSet.isEmpty) {
+      return dataset.toDF()
+    }
+
+    logWarning("Vector column conversion has serialization overhead. " +
+      "Please migrate your datasets and workflows to use the spark.ml 
package.")
+
+    // TODO: This implementation has performance issues due to unnecessary 
serialization.
+    // TODO: It is better (but trickier) if we can cast the old vector type to 
new type directly.
+    val convertToML = udf { v: Vector => v.asML }
+    val exprs = schema.fields.map { field =>
+      val c = field.name
+      if (colSet.contains(c)) {
+        convertToML(col(c)).as(c, field.metadata)
+      } else {
+        col(c)
+      }
+    }
+    dataset.select(exprs: _*)
+  }
+
+  /**
+   * Converts vector columns in an input Dataset to the 
[[org.apache.spark.ml.linalg.Vector]] type
+   * from the new [[org.apache.spark.mllib.linalg.Vector]] type under the 
`spark.ml` package.
+   * @param dataset input dataset
+   * @param cols a list of vector columns to be converted. Old vector columns 
will be ignored. If
+   *             unspecified, all new vector columns will be converted except 
nested ones.
+   * @return the input [[DataFrame]] with new vector columns converted to the 
old vector type
+   */
+  @Since("2.0.0")
+  @varargs
+  def convertVectorColumnsFromML(dataset: Dataset[_], cols: String*): 
DataFrame = {
+    val schema = dataset.schema
+    val colSet = if (cols.nonEmpty) {
+      cols.flatMap { c =>
+        val dataType = schema(c).dataType
+        if (dataType.getClass == classOf[MLVectorUDT]) {
+          Some(c)
+        } else {
+          // ignore old vector columns and raise an exception on other column 
types
+          require(dataType.getClass == classOf[VectorUDT],
+            s"Column $c must be new Vector type to be converted to old type 
but got $dataType.")
+          None
+        }
+      }.toSet
+    } else {
+      schema.fields
+        .filter(_.dataType.getClass == classOf[MLVectorUDT])
+        .map(_.name)
+        .toSet
+    }
+
+    if (colSet.isEmpty) {
+      return dataset.toDF()
+    }
+
+    logWarning("Vector column conversion has serialization overhead. " +
+      "Please migrate your datasets and workflows to use the spark.ml 
package.")
+
+    // TODO: This implementation has performance issues due to unnecessary 
serialization.
+    // TODO: It is better (but trickier) if we can cast the new vector type to 
old type directly.
+    val convertFromML = udf { Vectors.fromML _ }
+    val exprs = schema.fields.map { field =>
+      val c = field.name
+      if (colSet.contains(c)) {
+        convertFromML(col(c)).as(c, field.metadata)
+      } else {
+        col(c)
+      }
+    }
+    dataset.select(exprs: _*)
+  }
+
+  /**
    * Returns the squared Euclidean distance between two vectors. The following 
formula will be used
    * if it does not introduce too much numerical error:
    * <pre>
@@ -261,7 +368,6 @@ object MLUtils {
    * </pre>
    * When both vector norms are given, this is faster than computing the 
squared distance directly,
    * especially when one of the vectors is a sparse vector.
-   *
    * @param v1 the first vector
    * @param norm1 the norm of the first vector, non-negative
    * @param v2 the second vector
@@ -314,7 +420,6 @@ object MLUtils {
    * When `x` is positive and large, computing `math.log(1 + math.exp(x))` 
will lead to arithmetic
    * overflow. This will happen when `x > 709.78` which is not a very large 
number.
    * It can be addressed by rewriting the formula into `x + 
math.log1p(math.exp(-x))` when `x > 0`.
-   *
    * @param x a floating-point value as input.
    * @return the result of `math.log(1 + math.exp(x))`.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/f277cdf7/mllib/src/test/java/org/apache/spark/mllib/util/JavaMLUtilsSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/util/JavaMLUtilsSuite.java 
b/mllib/src/test/java/org/apache/spark/mllib/util/JavaMLUtilsSuite.java
new file mode 100644
index 0000000..2fa0bd2
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/mllib/util/JavaMLUtilsSuite.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.util;
+
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.spark.SharedSparkSession;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.Vectors;
+import org.apache.spark.mllib.regression.LabeledPoint;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+
+public class JavaMLUtilsSuite extends SharedSparkSession {
+
+  @Test
+  public void testConvertVectorColumnsToAndFromML() {
+    Vector x = Vectors.dense(2.0);
+    Dataset<Row> dataset = spark.createDataFrame(
+      Collections.singletonList(new LabeledPoint(1.0, x)), LabeledPoint.class
+    ).select("label", "features");
+    Dataset<Row> newDataset1 = MLUtils.convertVectorColumnsToML(dataset);
+    Row new1 = newDataset1.first();
+    Assert.assertEquals(RowFactory.create(1.0, x.asML()), new1);
+    Row new2 = MLUtils.convertVectorColumnsToML(dataset, "features").first();
+    Assert.assertEquals(new1, new2);
+    Row old1 = MLUtils.convertVectorColumnsFromML(newDataset1).first();
+    Assert.assertEquals(RowFactory.create(1.0, x), old1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f277cdf7/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 7b6bfee..3801bd1 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -25,12 +25,14 @@ import scala.io.Source
 import breeze.linalg.{squaredDistance => breezeSquaredDistance}
 import com.google.common.io.Files
 
-import org.apache.spark.SparkException
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.MLUtils._
 import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.MetadataBuilder
 import org.apache.spark.util.Utils
 
 class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
@@ -245,4 +247,58 @@ class MLUtilsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     assert(log1pExp(-13.8) ~== math.log1p(math.exp(-13.8)) absTol 1E-10)
     assert(log1pExp(-238423789.865) ~== math.log1p(math.exp(-238423789.865)) 
absTol 1E-10)
   }
+
+  test("convertVectorColumnsToML") {
+    val x = Vectors.sparse(2, Array(1), Array(1.0))
+    val metadata = new MetadataBuilder().putLong("numFeatures", 2L).build()
+    val y = Vectors.dense(2.0, 3.0)
+    val z = Vectors.dense(4.0)
+    val p = (5.0, z)
+    val w = Vectors.dense(6.0).asML
+    val df = spark.createDataFrame(Seq(
+      (0, x, y, p, w)
+    )).toDF("id", "x", "y", "p", "w")
+      .withColumn("x", col("x"), metadata)
+    val newDF1 = convertVectorColumnsToML(df)
+    assert(newDF1.schema("x").metadata === metadata, "Metadata should be 
preserved.")
+    val new1 = newDF1.first()
+    assert(new1 === Row(0, x.asML, y.asML, Row(5.0, z), w))
+    val new2 = convertVectorColumnsToML(df, "x", "y").first()
+    assert(new2 === new1)
+    val new3 = convertVectorColumnsToML(df, "y", "w").first()
+    assert(new3 === Row(0, x, y.asML, Row(5.0, z), w))
+    intercept[IllegalArgumentException] {
+      convertVectorColumnsToML(df, "p")
+    }
+    intercept[IllegalArgumentException] {
+      convertVectorColumnsToML(df, "p._2")
+    }
+  }
+
+  test("convertVectorColumnsFromML") {
+    val x = Vectors.sparse(2, Array(1), Array(1.0)).asML
+    val metadata = new MetadataBuilder().putLong("numFeatures", 2L).build()
+    val y = Vectors.dense(2.0, 3.0).asML
+    val z = Vectors.dense(4.0).asML
+    val p = (5.0, z)
+    val w = Vectors.dense(6.0)
+    val df = spark.createDataFrame(Seq(
+      (0, x, y, p, w)
+    )).toDF("id", "x", "y", "p", "w")
+      .withColumn("x", col("x"), metadata)
+    val newDF1 = convertVectorColumnsFromML(df)
+    assert(newDF1.schema("x").metadata === metadata, "Metadata should be 
preserved.")
+    val new1 = newDF1.first()
+    assert(new1 === Row(0, Vectors.fromML(x), Vectors.fromML(y), Row(5.0, z), 
w))
+    val new2 = convertVectorColumnsFromML(df, "x", "y").first()
+    assert(new2 === new1)
+    val new3 = convertVectorColumnsFromML(df, "y", "w").first()
+    assert(new3 === Row(0, x, Vectors.fromML(y), Row(5.0, z), w))
+    intercept[IllegalArgumentException] {
+      convertVectorColumnsFromML(df, "p")
+    }
+    intercept[IllegalArgumentException] {
+      convertVectorColumnsFromML(df, "p._2")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to