[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81429036 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml._ +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ +import scala.util.Random + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k ce
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81429258 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml._ +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ +import scala.util.Random + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k ce
[GitHub] flink issue #757: [FLINK-2131][ml]: Initialization schemes for k-means clust...
Github user skonto commented on the issue: https://github.com/apache/flink/pull/757 any progress with this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81429867 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml._ +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ +import scala.util.Random + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k ce
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81430070 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml._ +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ +import scala.util.Random + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k ce
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81430196 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml._ +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ +import scala.util.Random + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k ce
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81430683 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/KMeansITSuite.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ +import org.apache.flink.ml.math +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class KMeansITSuite extends FlatSpec with Matchers with FlinkTestBase { + + behavior of "The KMeans implementation" + + def fixture = new { +val env = ExecutionEnvironment.getExecutionEnvironment +val kmeans = KMeans(). + setInitialCentroids(ClusteringData.centroidData). + setNumIterations(ClusteringData.iterations) + +val trainingDS = env.fromCollection(ClusteringData.trainingData) + +kmeans.fit(trainingDS) + } + + it should "cluster data points into 'K' cluster centers" in { +val f = fixture + +val centroidsResult = f.kmeans.centroids.get.collect().apply(0) + +val centroidsExpected = ClusteringData.expectedCentroids + +// the sizes must match +centroidsResult.length should be === centroidsExpected.length + +// create a lookup table for better matching +val expectedMap = centroidsExpected map (e => e.label->e.vector.asInstanceOf[DenseVector]) toMap + +// each of the results must be in lookup table +centroidsResult.iterator.foreach(result => { + val expectedVector = expectedMap.get(result.label).get + + // the type must match (not None) + expectedVector shouldBe a [math.DenseVector] + + val expectedData = expectedVector.asInstanceOf[DenseVector].data + val resultData = result.vector.asInstanceOf[DenseVector].data + + // match the individual values of the vector + expectedData zip resultData foreach { +case (expectedVector, entryVector) => + entryVector should be(expectedVector +- 0.1) + } +}) + } + + it should "predict points to cluster centers" in { +val f = fixture + +val vectorsWithExpectedLabels = ClusteringData.testData +// create a lookup table for better matching +val expectedMap = vectorsWithExpectedLabels map (v => + v.vector.asInstanceOf[DenseVector] -> v.label + ) toMap + +// calculate the vector to cluster mapping on the plain vectors +val plainVectors = vectorsWithExpectedLabels.map(v => v.vector) +val predictedVectors = f.kmeans.predict(f.env.fromCollection(plainVectors)) + +// check if all vectors were labeled correctly +predictedVectors.collect() foreach (result => { + val expectedLabel = expectedMap.get(result._1.asInstanceOf[DenseVector]).get + result._2 should be(expectedLabel) +}) + + } + + it should "initialize k cluster centers randomly" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +val kmeans = KMeans() + .setNumClusters(10) + .setNumIterations(ClusteringData.iterations) + .setInitializationStrategy("random") + +val trainingDS = env.fromCollection(ClusteringData.trainingData) +kmeans.fit(trainingDS) + +println(trainingDS.mapWithBcVariable(kmeans.centroids.get) { --- End diff -- assertion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81430771 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/KMeansITSuite.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ +import org.apache.flink.ml.math +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class KMeansITSuite extends FlatSpec with Matchers with FlinkTestBase { + + behavior of "The KMeans implementation" + + def fixture = new { +val env = ExecutionEnvironment.getExecutionEnvironment +val kmeans = KMeans(). + setInitialCentroids(ClusteringData.centroidData). + setNumIterations(ClusteringData.iterations) + +val trainingDS = env.fromCollection(ClusteringData.trainingData) + +kmeans.fit(trainingDS) + } + + it should "cluster data points into 'K' cluster centers" in { +val f = fixture + +val centroidsResult = f.kmeans.centroids.get.collect().apply(0) + +val centroidsExpected = ClusteringData.expectedCentroids + +// the sizes must match +centroidsResult.length should be === centroidsExpected.length + +// create a lookup table for better matching +val expectedMap = centroidsExpected map (e => e.label->e.vector.asInstanceOf[DenseVector]) toMap + +// each of the results must be in lookup table +centroidsResult.iterator.foreach(result => { + val expectedVector = expectedMap.get(result.label).get + + // the type must match (not None) + expectedVector shouldBe a [math.DenseVector] + + val expectedData = expectedVector.asInstanceOf[DenseVector].data + val resultData = result.vector.asInstanceOf[DenseVector].data + + // match the individual values of the vector + expectedData zip resultData foreach { +case (expectedVector, entryVector) => + entryVector should be(expectedVector +- 0.1) + } +}) + } + + it should "predict points to cluster centers" in { +val f = fixture + +val vectorsWithExpectedLabels = ClusteringData.testData +// create a lookup table for better matching +val expectedMap = vectorsWithExpectedLabels map (v => + v.vector.asInstanceOf[DenseVector] -> v.label + ) toMap + +// calculate the vector to cluster mapping on the plain vectors +val plainVectors = vectorsWithExpectedLabels.map(v => v.vector) +val predictedVectors = f.kmeans.predict(f.env.fromCollection(plainVectors)) + +// check if all vectors were labeled correctly +predictedVectors.collect() foreach (result => { + val expectedLabel = expectedMap.get(result._1.asInstanceOf[DenseVector]).get + result._2 should be(expectedLabel) +}) + + } + + it should "initialize k cluster centers randomly" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +val kmeans = KMeans() + .setNumClusters(10) + .setNumIterations(ClusteringData.iterations) + .setInitializationStrategy("random") + +val trainingDS = env.fromCollection(ClusteringData.trainingData) +kmeans.fit(trainingDS) + +println(trainingDS.mapWithBcVariable(kmeans.centroids.get) { + (vector, centroid) => Math.pow(ClusteringData.MinClusterDistance(vector, centroid)._1, 2) +}.reduce(_ + _).collect().toArray.apply(0)) + } + + it should "initialize k cluster centers using kmeans++" in { + +val env = ExecutionEnvironment.getExecutionEnvironment
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81432816 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml._ +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ +import scala.util.Random + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k ce
[GitHub] flink pull request #757: [FLINK-2131][ml]: Initialization schemes for k-mean...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/757#discussion_r81433030 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml._ +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ +import scala.util.Random + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * --- End diff -- Add reference for kmeans|| too. eg. Bahmani et al Same for kmeans++. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #757: [FLINK-2131][ml]: Initialization schemes for k-means clust...
Github user skonto commented on the issue: https://github.com/apache/flink/pull/757 @sachingoel0101 @tillrohrmann any plans for this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3659: [FLINK-5785] Add an Imputer for preparing data
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3659 @p4nna although there is certainly interoperability between scala and java could you try first add the Imputer in the scala API. I will add some comments to the current implementation in java shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #757: [FLINK-2131][ml]: Initialization schemes for k-means clust...
Github user skonto commented on the issue: https://github.com/apache/flink/pull/757 @sachingoel0101 @tillrohrmann ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #757: [FLINK-2131][ml]: Initialization schemes for k-means clust...
Github user skonto commented on the issue: https://github.com/apache/flink/pull/757 @sachingoel0101 no problem been there ;) That would be good thnx! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1849: [FLINK-2157] [ml] Create evaluation framework for ML libr...
Github user skonto commented on the issue: https://github.com/apache/flink/pull/1849 Hey @thvasilo is this under development? From what I see many other tasks depend on it right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1849: [FLINK-2157] [ml] Create evaluation framework for ...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/1849#discussion_r96403197 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Evaluation score + * + * Can be used to calculate a performance score for an algorithm, when provided with a DataSet + * of (truth, prediction) tuples + * + * @tparam PredictionType output type + */ +trait Score[PredictionType] { --- End diff -- What is the benefit of having the scores independent of the models. For example each model could implement it's own score function within its implementation class. I may miss something here... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1849: [FLINK-2157] [ml] Create evaluation framework for ML libr...
Github user skonto commented on the issue: https://github.com/apache/flink/pull/1849 @thvasilo thnx I will have a look --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1849: [FLINK-2157] [ml] Create evaluation framework for ML libr...
Github user skonto commented on the issue: https://github.com/apache/flink/pull/1849 Hi guys, my intention was to review #2838 but my feeling is that it overlaps with this one. @thvasilo we can push this one first as you said so I will have a look at it and comment on it. The benefit is unblocking other tasks in this area. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98453679 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/clustering/ClusteringData.scala --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{DenseVector, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric + +/** + * Trainings- and test-data set for the K-Means implementation + * [[org.apache.flink.ml.clustering.KMeans]]. + */ +object ClusteringData { + + /* + * Number of iterations for the K-Means algorithm. + */ + val iterations = 10 + + /* + * Sequence of initial centroids. + */ + val centroidData: Seq[LabeledVector] = Seq( --- End diff -- These could go to an external test resource file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98454397 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/package.scala --- @@ -18,13 +18,16 @@ package org.apache.flink + --- End diff -- delete space here please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98456054 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.ml._ +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) --- End diff -- A sidenote. In general I am fine with pipelines API but in this case for example `train` would be more appropriate as the method instead of fit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98454850 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.ml._ +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. --- End diff -- It might be useful in the future to have a method to estimate the best k to use in case k is unknown eg. run parallel tests for the error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's algorithm)
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3192 @sachingoel0101 I am having a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's algorithm)
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3192 @sachingoel0101 please add an example at least under flink examples. Is the code documentation enough? @thvasilo ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's algorithm)
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3192 Ok ... there is one [example](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98511916 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.ml._ +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. --- End diff -- :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's algorithm)
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3192 Correct, just saying we will need to create a set of examples for the ml library. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98557895 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/clustering/ClusteringData.scala --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{DenseVector, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric + +/** + * Trainings- and test-data set for the K-Means implementation --- End diff -- I would prefer to keep all comments with the same format. Again I see the rest of the code base has a mix of scaladocs and javadocs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98570744 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.ml._ +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + * [[org.apache.flink.ml.clustering.
[GitHub] flink pull request #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's alg...
Github user skonto commented on a diff in the pull request: https://github.com/apache/flink/pull/3192#discussion_r98650525 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.ml._ +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BLAS, Vector} +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + * [[org.apache.flink.ml.clustering.
[GitHub] flink issue #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's algorithm)
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3192 I have run the tests and verified the code, seems to work fine. I need to check the centers generated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's algorithm)
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3192 @sachingoel0101 could you update the PR so I can do a final review and request a merge? @tillrohrmann could assist with the forwardedfields question? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3192: [FLINK-1731][ml] Add KMeans clustering(Lloyd's algorithm)
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3192 @sachingoel0101 np as soon as you are ready let me know. Also @tillrohrmann made some comments to take into consideration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3313: [FLINK-5588][ml] add a data normalizer to ml libra...
GitHub user skonto opened a pull request: https://github.com/apache/flink/pull/3313 [FLINK-5588][ml] add a data normalizer to ml library - Adds a Normalizer. - Still need to add the Unit Scaler for the features. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/flink unit_scaler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3313.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3313 commit e25af5c8331214167277a982f0ec8de5b41a202d Author: Stavos Kontopoulos Date: 2017-02-13T02:16:13Z add the normalizer commit f85ed43aadbc3bb3233d9c274dd352ad759cfec9 Author: Stavos Kontopoulos Date: 2017-02-14T21:55:46Z add docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3313: [FLINK-5588][ml] add a data normalizer to ml library
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3313 @thvasilo @tillrohrmann pls review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3313: [FLINK-5588][ml] add a data normalizer to ml library
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3313 @tillrohrmann the tests never finished :( Is there a way to re-trigger it besides a commit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3313: [FLINK-5588][ml] add a data normalizer to ml library
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3313 Ok sorry for that I did squash the commits, also I am used to it from other projects where the comments are invalidated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3313: [FLINK-5588][ml] add a data normalizer to ml library
Github user skonto commented on the issue: https://github.com/apache/flink/pull/3313 WIP I will add the unit scaler here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---