Repository: spark
Updated Branches:
  refs/heads/master 3c6198c86 -> 79a4dab62


[SPARK-21958][ML] Word2VecModel save: transform data in the cluster

## What changes were proposed in this pull request?

Change a data transformation while saving a Word2VecModel to happen with 
distributed data instead of local driver data.

## How was this patch tested?

Unit tests for the ML sub-component still pass.
Running this patch against v2.2.0 in a fully distributed production cluster 
allows a 4.0G model to save and load correctly, where it would not do so 
without the patch.

Author: Travis Hegner <theg...@trilliumit.com>

Closes #19191 from travishegner/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79a4dab6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79a4dab6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79a4dab6

Branch: refs/heads/master
Commit: 79a4dab6297121c075a310a50d0fc0549a3c1e41
Parents: 3c6198c
Author: Travis Hegner <theg...@trilliumit.com>
Authored: Fri Sep 15 15:17:16 2017 +0200
Committer: Nick Pentreath <ni...@za.ibm.com>
Committed: Fri Sep 15 15:17:16 2017 +0200

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79a4dab6/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index f6095e2..fe3306e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -337,14 +337,17 @@ object Word2VecModel extends MLReadable[Word2VecModel] {
       DefaultParamsWriter.saveMetadata(instance, path, sc)
 
       val wordVectors = instance.wordVectors.getVectors
-      val dataSeq = wordVectors.toSeq.map { case (word, vector) => Data(word, 
vector) }
       val dataPath = new Path(path, "data").toString
       val bufferSizeInBytes = Utils.byteStringAsBytes(
         sc.conf.get("spark.kryoserializer.buffer.max", "64m"))
       val numPartitions = Word2VecModelWriter.calculateNumberOfPartitions(
         bufferSizeInBytes, instance.wordVectors.wordIndex.size, 
instance.getVectorSize)
-      sparkSession.createDataFrame(dataSeq)
+      val spark = sparkSession
+      import spark.implicits._
+      spark.createDataset[(String, Array[Float])](wordVectors.toSeq)
         .repartition(numPartitions)
+        .map { case (word, vector) => Data(word, vector) }
+        .toDF()
         .write
         .parquet(dataPath)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to