Github user Krimit commented on a diff in the pull request:
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
    @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
       override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
    +  /**
    +   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
    +   * like the original, we size it to be 20 times the vocabulary size.
    +   * We sacrifice memory here, to get constant time lookups into this 
array when generating
    +   * negative samples.
    +   */
    +  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
    +    val table = Array.fill(tableSize)(0)
    +    var a = 0
    +    var i = 0
    +    while (a < table.length) {
    +      table.update(a, i)
    +      if (a.toFloat / table.length >= normalizedWeights(i)) {
    +        i = math.min(normalizedWeights.length - 1, i + 1)
    +      }
    +      a += 1
    +    }
    +    table
    +  }
    +  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
    +      (Int, Long, Map[String, Int], Array[Int]) = {
    +    val sc = input.context
    +    val words = input.flatMap(x => x)
    +    val vocab = => (w, 1L))
    +      .reduceByKey(_ + _)
    +      .filter{case (w, c) => c >= $(minCount)}
    +      .collect()
    +      .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
    +    val totalWordCount =
    +    val vocabMap ={case ((w, c), i) =>
    +      w -> i
    +    }.toMap
    +    // We create a cumulative distribution array, unlike the original 
    +    // and use binary search to get insertion points. This should 
replicate the same
    +    // behavior as the table in original implementation.
    +    val weights = => scala.math.pow(x._2, power))
    +    val totalWeight = weights.sum
    +    val normalizedCumWeights = weights.scanLeft(0.0)(_ + _) => 
(x / totalWeight))
    +    val unigramTableSize =
    +      math.min(maxUnigramTableSize, unigramTableSizeFactor * 
    +    val unigramTable = generateUnigramTable(normalizedCumWeights, 
    +    (vocabMap.size, totalWordCount, vocabMap, unigramTable)
    +  }
    +  /**
    +   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
    +   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
    +   * The algorithm is parallelized in the same way as the skip-gram based 
    +   * @param input
    +   * @return
    +   */
    +  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
    +    val (vocabSize, totalWordCount, vocabMap, uniTable) = 
    +    val negSamples = $(negativeSamples)
    +    assert(negSamples < vocabSize,
    +      s"Vocab size ($vocabSize) cannot be smaller than negative 
    +    val seed = $(this.seed)
    +    val initRandom = new XORShiftRandom(seed)
    +    val vectorSize = $(this.vectorSize)
    +    val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
    +    val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
    +    val sc = input.context
    +    val vocabMapbc = sc.broadcast(vocabMap)
    +    val unigramTablebc = sc.broadcast(uniTable)
    +    val window = $(windowSize)
    +    val digitSentences = input.flatMap{sentence =>
    +      val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
    +      wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
    +    }.repartition($(numPartitions)).cache()
    +    val learningRate = $(stepSize)
    +    val wordsPerPartition = totalWordCount / $(numPartitions)
    +    logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
    +    for {iteration <- 1 to $(maxIter)} {
    +      logInfo(s"Starting iteration: $iteration")
    +      val iterationStartTime = System.nanoTime()
    +      val syn0bc = sc.broadcast(syn0Global)
    +      val syn1bc = sc.broadcast(syn1Global)
    +      val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, 
iter) =>
    +        logInfo(s"Iteration: $iteration, Partition: $i_")
    +        logInfo(s"Numerical lib class being used : 
    +        val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ 
((-iteration - 1) << 8))
    +        val contextWordPairs = iter.flatMap(generateContextWordPairs(_, 
window, random))
    +        val groupedBatches = contextWordPairs.grouped(batchSize)
    +        val negLabels = 1.0f +: Array.fill(negSamples)(0.0f)
    +        val syn0 = syn0bc.value
    +        val syn1 = syn1bc.value
    +        val unigramTable = unigramTablebc.value
    +        // initialize intermediate arrays
    +        val contextVector = Array.fill(vectorSize)(0.0f)
    +        val l2Vectors = Array.fill(vectorSize * (negSamples + 1))(0.0f)
    +        val gb = Array.fill(negSamples + 1)(0.0f)
    +        val hiddenLayerUpdate = Array.fill(vectorSize * (negSamples + 
    +        val neu1e = Array.fill(vectorSize)(0.0f)
    +        val wordIndices = Array.fill(negSamples + 1)(0)
    +        val time = System.nanoTime
    +        var batchTime = System.nanoTime
    +        var idx = -1L
    +        for (batch <- groupedBatches) {
    +          idx = idx + 1
    +          val wordRatio =
    +            idx.toFloat * batchSize /
    +            ($(maxIter) * (wordsPerPartition.toFloat + 1)) + ((iteration - 
1).toFloat / $(maxIter))
    +          val alpha = math.max(learningRate * 0.0001, learningRate * (1 - 
    +          if(idx % 10 == 0 && idx > 0) {
    +            logInfo(s"Partition: $i_, wordRatio = $wordRatio, alpha = 
    +            val wordCount = batchSize * idx
    +            val timeTaken = (System.nanoTime - time) / 1e6
    +            val batchWordCount = 10 * batchSize
    +            val currentBatchTime = (System.nanoTime - batchTime) / 1e6
    +            batchTime = System.nanoTime
    +            logInfo(s"Partition: $i_, Batch time: $currentBatchTime ms, 
batch speed: " +
    +              s"${batchWordCount / currentBatchTime * 1000} words/s")
    +            logInfo(s"Partition: $i_, Cumulative time: $timeTaken ms, 
cumulative speed: " +
    +              s"${wordCount / timeTaken * 1000} words/s")
    +          }
    +          val errors = for ((contextIds, word) <- batch) yield {
    +            // initialize vectors to 0
    +            initializeVector(contextVector)
    +            initializeVector(l2Vectors)
    +            initializeVector(gb)
    +            initializeVector(hiddenLayerUpdate)
    +            initializeVector(neu1e)
    +            val scale = 1.0f / contextIds.length
    +            // feed forward
    +            contextIds.foreach { c =>
    +              blas.saxpy(vectorSize, scale, syn0, c * vectorSize, 1, 
contextVector, 0, 1)
    +            }
    +            generateNegativeSamples(random, word, unigramTable, 
negSamples, wordIndices)
    +            wordIndices.view.zipWithIndex.foreach { case (wordId, i) =>
    +              blas.scopy(vectorSize, syn1, vectorSize * wordId, 1, 
l2Vectors, vectorSize * i, 1)
    +            }
    +            val rows = negSamples + 1
    +            val cols = vectorSize
    +            blas
    +              .sgemv("T", cols, rows, 1.0f, l2Vectors, 0, cols, 
contextVector, 0, 1, 0.0f, gb, 0, 1)
    +            (0 to gb.length-1).foreach {i =>
    +              val v = 1.0f / (1 + math.exp(-gb(i)).toFloat)
    +              val err = (negLabels(i) - v) * alpha
    +              gb.update(i, err)
    +            }
    +            // update for hidden -> output layer
    +            blas.sger(cols, rows, 1.0f, contextVector, 1, gb, 1, 
hiddenLayerUpdate, cols)
    +            // update hidden -> output layer, syn1
    +            wordIndices.view.zipWithIndex.foreach {case (w, i) =>
    +              blas.saxpy(vectorSize,
    +                1.0f,
    +                hiddenLayerUpdate,
    +                i * vectorSize,
    +                1,
    +                syn1,
    +                w * vectorSize,
    +                1)
    +            }
    +            // update for word vectors
    +            blas.sgemv("N", cols, rows, scale, l2Vectors, 0, cols, gb, 0, 
1, 1.0f, neu1e, 0, 1)
    +            // update input -> hidden layer, syn0
    +            contextIds.foreach { i =>
    +              blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, i * 
vectorSize, 1)
    +            }
    +   / alpha
    +          }
    +          logInfo(s"Partition: $i_, Average Batch Error = ${errors.sum / 
    +        }
    +        Iterator((0, syn0), (1, syn1))
    +      }
    +      val aggedMatrices = partialFits.reduceByKey{case (v1, v2) =>
    +        blas.saxpy(vocabSize, 1.0f, v2, 1, v1, 1)
    +        v1
    +      }.collect
    +      assert(aggedMatrices.length == 2)
    +      val norm = 1.0f / $(numPartitions)
    +      aggedMatrices.foreach {case (i, syn) =>
    +        blas.sscal(syn.length, norm, syn, 0, 1)
    +        if (i == 0) {
    +          // copy syn0
    +          blas.scopy(syn.length, syn, 0, 1, syn0Global, 0, 1)
    +        } else {
    +          // copy syn1
    +          blas.scopy(syn.length, syn, 0, 1, syn1Global, 0, 1)
    +        }
    +      }
    +      syn0bc.destroy(false)
    +      syn1bc.destroy(false)
    +      val timePerIteration = (System.nanoTime() - iterationStartTime) / 1e6
    +      logInfo(s"Total time taken per iteration: ${timePerIteration} ms")
    +    }
    +    digitSentences.unpersist()
    +    vocabMapbc.destroy()
    +    unigramTablebc.destroy()
    +    new feature.Word2VecModel(vocabMap, syn0Global)
    +  }
    +  private def initializeVector(v: Array[Float], value: Float = 0.0f): Unit 
= {
    --- End diff --
    Nit: From what I can tell ``value`` is never actually passed in, so it's 
always equal to ``0.0f``. Maybe call the method ``zeroVector`` and make that 
more explicit?

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to