Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490284
  
    --- Diff: docs/mllib-clustering.md ---
    @@ -153,3 +153,75 @@ provided in the [Self-Contained 
Applications](quick-start.html#self-contained-ap
     section of the Spark
     Quick Start guide. Be sure to also include *spark-mllib* to your build 
file as
     a dependency.
    +
    +## Streaming clustering
    +
    +When data arrive in a stream, we may want to estimate clusters 
dynamically, updating them as new data arrive. MLlib provides support for 
streaming KMeans clustering, with parameters to control the decay (or 
"forgetfulness") of the estimates. The algorithm uses a generalization of the 
mini-batch KMeans update rule. For each batch of data, we assign all points to 
their nearest cluster, compute new cluster centers, then update each cluster 
using:
    +
    +`\begin{equation}
    +    c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
    +\end{equation}`
    +`\begin{equation}
    +    n_{t+1} = n_t + m_t  
    +\end{equation}`
    +
    +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the 
number of points assigned to the cluster thus far, `$x_t$` is the new cluster 
center from the current batch, and `$m_t$` is the number of points added to the 
cluster in the current batch. The decay factor `$\alpha$` can be used to ignore 
the past: with `$\alpha$=1` all data will be used from the beginning; with 
`$\alpha$=0` only the most recent data will be used. This is analogous to an 
expontentially-weighted moving average.
    +
    +### Examples
    +
    +This example shows how to estimate clusters on streaming data.
    +
    +<div class="codetabs">
    +
    +<div data-lang="scala" markdown="1">
    +
    +First we import the neccessary classes.
    +
    +{% highlight scala %}
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.mllib.clustering.StreamingKMeans
    +
    +{% endhighlight %}
    +
    +Then we make an input stream of vectors for training, as well as one for 
testing. We assume a StreamingContext `ssc` has been created, see [Spark 
Streaming Programming Guide](streaming-programming-guide.html#initializing) for 
more info. For this example, we use vector data. 
    +
    +{% highlight scala %}
    +
    +val trainingData = 
ssc.textFileStream("/training/data/dir").map(Vectors.parse)
    +val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse)
    +
    +{% endhighlight %}
    +
    +We create a model with random clusters and specify the number of clusters 
to find
    +
    +{% highlight scala %}
    +
    +val numDimensions = 3
    +val numClusters = 2
    +val model = new StreamingKMeans()
    +    .setK(numClusters)
    +    .setDecayFactor(1.0)
    +    .setRandomWeights(numDimensions)
    +
    +{% endhighlight %}
    +
    +Now register the streams for training and testing and start the job, 
printing the predicted cluster assignments on new data points as they arrive.
    +
    +{% highlight scala %}
    +
    +model.trainOn(trainingData)
    +model.predictOn(testData).print()
    --- End diff --
    
    `predictOn` only outputs the prediction, which is not very useful. maybe we 
should use `predictOnValues` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to