Repository: spark Updated Branches: refs/heads/master 3c6198c86 -> 79a4dab62
[SPARK-21958][ML] Word2VecModel save: transform data in the cluster ## What changes were proposed in this pull request? Change a data transformation while saving a Word2VecModel to happen with distributed data instead of local driver data. ## How was this patch tested? Unit tests for the ML sub-component still pass. Running this patch against v2.2.0 in a fully distributed production cluster allows a 4.0G model to save and load correctly, where it would not do so without the patch. Author: Travis Hegner <theg...@trilliumit.com> Closes #19191 from travishegner/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79a4dab6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79a4dab6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79a4dab6 Branch: refs/heads/master Commit: 79a4dab6297121c075a310a50d0fc0549a3c1e41 Parents: 3c6198c Author: Travis Hegner <theg...@trilliumit.com> Authored: Fri Sep 15 15:17:16 2017 +0200 Committer: Nick Pentreath <ni...@za.ibm.com> Committed: Fri Sep 15 15:17:16 2017 +0200 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/79a4dab6/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index f6095e2..fe3306e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -337,14 +337,17 @@ object Word2VecModel extends MLReadable[Word2VecModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val wordVectors = instance.wordVectors.getVectors - val dataSeq = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) } val dataPath = new Path(path, "data").toString val bufferSizeInBytes = Utils.byteStringAsBytes( sc.conf.get("spark.kryoserializer.buffer.max", "64m")) val numPartitions = Word2VecModelWriter.calculateNumberOfPartitions( bufferSizeInBytes, instance.wordVectors.wordIndex.size, instance.getVectorSize) - sparkSession.createDataFrame(dataSeq) + val spark = sparkSession + import spark.implicits._ + spark.createDataset[(String, Array[Float])](wordVectors.toSeq) .repartition(numPartitions) + .map { case (word, vector) => Data(word, vector) } + .toDF() .write .parquet(dataPath) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org