[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-10-06 Thread peedeeX21
Github user peedeeX21 closed the pull request at:

https://github.com/apache/flink/pull/700


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-05-20 Thread peedeeX21
GitHub user peedeeX21 opened a pull request:

https://github.com/apache/flink/pull/700

[FLINK-1731] [ml] Implementation of Feature K-Means and Test Suite

Within the IMPRO-3 warm-up task the implementation of K-Means and 
corresponding test suite was done.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/peedeeX21/flink feature_kmeans

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/700.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #700


commit 02fe6b2c7ebc6bf4b55e832681286994b03c4d40
Author: Florian Goessler 
Date:   2015-05-20T09:12:20Z

[FLINK-1731] [ml] unit test for KMeans

commit 71aa47bd06ad2e051749ea1b9df923b8eb5bf6e4
Author: Peter Schrott 
Date:   2015-05-20T11:08:36Z

[FLINK-1731] [ml] Implementation of K-Means




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-05-29 Thread aalexandrov
Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-106911412
  
Can anybody with more Apache insight answer to @peedeeX21 concerns? 
Otherwise I suggest to merge this and open a follow-up issue that extends the 
current implementation to KMeans++. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-01 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-107831123
  
Hey guys. You might wanna look at the initialization schemes here: 
https://github.com/apache/flink/pull/757


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-03 Thread FGoessler
Github user FGoessler commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-108310843
  
The travis build is failing on Oracle JDK 8. Maven or Flink are hanging 
according to the build log. Can anyone help or at least restart the build? 
Are there any known "flipping tests"? Imo the failure isn't related to our 
changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-10 Thread peedeeX21
Github user peedeeX21 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-110621613
  
@tillrohrmann 
Would you please help me out with that pending pull request? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-10 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-110658920
  
Will do @peedeeX21. Currently I'm busy with the upcoming release, but once 
we're done with it, I'll work on this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-10 Thread peedeeX21
Github user peedeeX21 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-110681501
  
@tillrohrmann great. no worries. was just not sure what is going on. :) 
good luck with the new release!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-22 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-114135260
  
Hello @peedeeX21 , most of the failing Travis tests have been fixed in the 
current master, could you try rebasing this PR and making a forced push to this 
branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-24 Thread FGoessler
Github user FGoessler commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-114921879
  
Just rebased and force pushed -> hoping for good Travis results :smiley: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-25 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-115133956
  
Thanks, seems like all is fine now. We will start reviewing this in the 
next few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33460336
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumI

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33460529
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumI

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33461173
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumI

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33462036
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumI

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33462286
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumI

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463192
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumI

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463484
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
--- End diff --

Rename to ClusteringData.scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463634
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import breeze.linalg.{DenseVector => BreezeDenseVector, Vector => 
BreezeVector}
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.{DenseVector, Vector}
+
+/**
+ * Trainings- and test-data set for the K-Means implementation
+ * [[org.apache.flink.ml.clustering.KMeans]].
+ */
+object Clustering {
+
+  /*
+   * Number of iterations for the K-Means algorithm.
+   */
+  val iterations = 10
+
+  /*
+   * Sequence of initial centroids.
+   */
+  val centroidData: Seq[LabeledVector] = Seq(
+LabeledVector(1, DenseVector(-0.1369104662767052, 0.2949172396037093, 
-0.01070450818187003)),
+LabeledVector(2, DenseVector(0.43643950041582885, 0.30117329671833215, 
0.20965108353159922)),
+LabeledVector(3, DenseVector(0.26011627041438423, 0.22954649683337805, 
0.2936286262276151)),
+LabeledVector(4, DenseVector(-0.041980932305508145, 
0.03116256923634109, 0.31065743174542293)),
+LabeledVector(5, DenseVector(0.0984398491976613, -0.21227718242541602, 
-0.45083084300074255)),
+LabeledVector(6, DenseVector(-0.216526923545, 
-0.47142840804338293, -0.02298954070830948)),
+LabeledVector(7, DenseVector(-0.0632307695567563, 0.2387221400443612, 
0.09416850805771804)),
+LabeledVector(8, DenseVector(0.16383680898916775, 
-0.24586810465119346, 0.08783590589294081)),
+LabeledVector(9, DenseVector(-0.24763544645492513, 
0.19688995732231254, 0.4520904742796472)),
+LabeledVector(10, DenseVector(0.16468044138881932, 
0.06259522206982082, 0.12145870313604247))
+
+  )
+
+  /*
+   * 3 Dimensional DenseVectors from a Part of Cosmo-Gas Dataset
+   * Reference: http://nuage.cs.washington.edu/benchmark/
+   */
+  val trainingData: Seq[Vector] = Seq(
+DenseVector(-0.489811986685, 0.496883004904, -0.483860999346),
+DenseVector(-0.485296010971, 0.496421992779, -0.484212994576),
+DenseVector(-0.481514006853, 0.496134012938, -0.48508900404),
+DenseVector(-0.47854255, 0.496246010065, -0.486301004887),
+DenseVector(-0.475461006165, 0.496093004942, -0.487686008215),
+DenseVector(-0.471846997738, 0.496558994055, -0.488242000341),
+DenseVector(-0.467496991158, 0.497166007757, -0.48861899972),
+DenseVector(-0.463036000729, 0.497680991888, -0.489721000195),
+DenseVector(-0.458972990513, 0.4984369874, -0.490575999022),
+DenseVector(-0.455772012472, 0.499684005976, -0.491737008095),
+DenseVector(-0.453074991703, -0.499433010817, -0.492006987333),
+DenseVector(-0.450913995504, -0.499316990376, -0.492769002914),
+DenseVector(-0.448724985123, -0.499406009912, -0.493508011103),
+DenseVector(-0.44715899229, -0.499680995941, -0.494500011206),
+DenseVector(-0.445362001657, -0.499630987644, -0.495151996613),
+DenseVector(-0.442811012268, -0.499303996563, -0.495151013136),
+DenseVector(-0.439810991287, -0.499332994223, -0.49529799819),
+DenseVector(-0.43678098917, -0.499361991882, -0.49545699358),
+DenseVector(-0.433919012547, -0.499334007502, -0.495705991983),
+DenseVector(-0.43117800355, -0.499345004559, -0.496196985245),
+DenseVector(-0.428333997726, -0.499083012342, -0.496385991573),
+DenseVector(-0.425300985575, -0.49844199419, -0.496405988932),
+DenseVector(-0.421882003546, -0.497743010521, -0.496706992388),
+DenseVector(-0.418137013912, -0.497193992138, -0.496524989605),
+DenseVector(-0.414458990097, -0.496717989445, -0.49600699544),
+DenseVector(-0.411509007215, -0.495965003967, -0.495519012213),
+DenseVector(-0.40851598978, -0.49593898654, -0

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116662421
  
Hello I've left some initial comments. Once those have been addressed I'll 
try to do some more integration testing and then pass the review over to a 
commiter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33465223
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116669400
  
Another note: It should not be necessary for the user to provide the 
initial centroids, those should be possible to generated from the algorithm 
itself, ideally with a scheme like kmeans++.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33466208
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumI

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116685600
  
I've been following this PR since my PR on initialization schemes can't be 
merged before this. I already have three initialization mechanisms [namely 
Random, k-means++, kmeans||]. I've referenced the PR on this thread earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33468244
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.Num

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33469076
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.Num

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33471983
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.Num

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33475298
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.Num

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33475321
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.Num

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33476507
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.Num

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116850459
  
I am having some trouble to fit our predictor into the new API. 
The problem is, that with `PredictOperation` the type of the model has to 
be defined. A `DataSet` of this type is the output of the `getModel`. For the 
`predict` method the input is just an object of this type.

In our case our model is a `DataSet` of `LabeledVectors` (the centroids). 
This means I can not implement a `PredictOperation` due to that restriction.

For me the API feels a bit inconsistent in that case 

For now I implemented only an `PredictDataSetOperation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-30 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-117053448
  
Hello @peedeeX21 . The API does not deal with distributed models at the 
moment. In the K-means case having the model distributed is overkill, as it is 
highly unlikely that you will have >1000 centroids, making the model tiny, and 
distributing it actually creates unnecessary overhead.

We can keep the current implementation, but in the future we should really 
test against a non distributed model, which can be broadcast in a 
DataSet[Seq[LabeledVector]] and compare performance.

Also, could you add an evaluate operation (EvaluateDataSetOperation) for 
Kmeans (and corresponding test)? It would be parametrized as 
EvaluateDataSetOperation[Kmeans, Vector, Double]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-30 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-117060737
  
Hi. IMO, the purpose of learning is to develop a model which compactly 
represents the data somehow. Thus, having a distributed model doesn't make 
sense. Besides, the user might just want to take the model and use it somewhere 
else in which case it makes sense to have it available not-as-distributed, but 
just as a java slash scala object which user can easily operate on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-30 Thread peedeeX21
Github user peedeeX21 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-117068361
  
I totally agree on you guys points. We have a little amount of centroids, 
and the model is not supposed to be distributed in the end.

The question is now: Should the resulting `DataSet` of centroids just be 
collected, or the the whole iteration be rewritten to work an a non distributed 
collection?

Note: Unfortunately I am quite busy right now with other projects, so I 
wont have time to do lots of changes right now. Either the people from my group 
(who might actually have the same workload right now) or @sachingoel0101 can 
work on that if its really urgent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-30 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-117194517
  
What we would like to see actually is this PR and #757 to be merged into 
one, so that we can review them as a whole. @sachingoel0101 do you think you 
will be able to do that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-07-01 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-117723450
  
@thvasilo , how do I merge this PR into mine? Maybe @peedeeX21 can create a 
pull request to my branch at 
https://github.com/sachingoel0101/flink/tree/clustering_initializations or is 
there a better option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-07-01 Thread peedeeX21
Github user peedeeX21 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-117730349
  
@sachingoel0101 me creating a pull request for your repo would be the best. 
But for some reason I can't choose your repo as base fork. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-07-01 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-117731195
  
@peedeeX21 , try this link: 
https://github.com/sachingoel0101/flink/compare/clustering_initializations...peedeeX21:feature_kmeans
I had a lot of trouble getting to create a PR to your repo yesterday.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-07-02 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-118017596
  
Hello @peedeeX21, one thing you could try is to rebase this branch on 
@sachingoel0101's branch, and then do a forced push to this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-07-02 Thread peedeeX21
Github user peedeeX21 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-118020035
  
@thvasilo I actually could create a pull request for @sachingoel0101 . So 
everything should be fine now. We can even close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-07-02 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-118023871
  
Sure, feel free to close this, and link to the new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---