Hi Christoph, I am not an expert in ML and have not used Spark KMeans but your problem seems to be an issue of local minimum vs global minimum. You should run K-means multiple times with random starting point and also try with multiple values of K (unless you are already sure).
Hope this helps. Thanks Ankur On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke <carabo...@gmail.com> wrote: > Hi Anastasios, > > thanks for the reply but caching doesn’t seem to change anything. > > After further investigation it really seems that the RDD#takeSample method > is the cause of the non-reproducibility. > > Is this considered a bug and should I open an Issue for that? > > BTW: my example script contains a little type in line 3: it is `feature` > not `features` (mind the `s`). > > Best, > Christoph > > The script with caching > > ``` > import org.apache.spark.sql.DataFrame > import org.apache.spark.ml.clustering.KMeans > import org.apache.spark.ml.feature.VectorAssembler > import org.apache.spark.storage.StorageLevel > > // generate random data for clustering > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > val data = vecAssembler.transform(randomData) > > // instantiate KMeans with fixed seed > val kmeans = new KMeans().setK(10).setSeed(9876L) > > // train the model with different partitioning > val dataWith1Partition = data.repartition(1) > // cache the data > dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK) > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost( > dataWith1Partition)) > > val dataWith4Partition = data.repartition(4) > // cache the data > dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK) > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost( > dataWith4Partition)) > > > ``` > > Output: > > ``` > 1 Partition: 16.028212597888057 > 4 Partition: 16.14758460544976 > ``` > > > On 22 May 2017, at 16:33, Anastasios Zouzias <zouz...@gmail.com> wrote: > > > > Hi Christoph, > > > > Take a look at this, you might end up having a similar case: > > > > http://www.spark.tc/using-sparks-cache-for-correctness- > not-just-performance/ > > > > If this is not the case, then I agree with you the kmeans should be > partitioning agnostic (although I haven't check the code yet). > > > > Best, > > Anastasios > > > > > > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke <carabo...@gmail.com> > wrote: > > Hi, > > > > I’m trying to figure out how to use KMeans in order to achieve > reproducible results. I have found that running the same kmeans instance on > the same data, with different partitioning will produce different > clusterings. > > > > Given a simple KMeans run with fixed seed returns different results on > the same > > training data, if the training data is partitioned differently. > > > > Consider the following example. The same KMeans clustering set up is run > on > > identical data. The only difference is the partitioning of the training > data > > (one partition vs. four partitions). > > > > ``` > > import org.apache.spark.sql.DataFrame > > import org.apache.spark.ml.clustering.KMeans > > import org.apache.spark.ml.features.VectorAssembler > > > > // generate random data for clustering > > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > > > val data = vecAssembler.transform(randomData) > > > > // instantiate KMeans with fixed seed > > val kmeans = new KMeans().setK(10).setSeed(9876L) > > > > // train the model with different partitioning > > val dataWith1Partition = data.repartition(1) > > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost( > dataWith1Partition)) > > > > val dataWith4Partition = data.repartition(4) > > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost( > dataWith4Partition)) > > ``` > > > > I get the following related cost > > > > ``` > > 1 Partition: 16.028212597888057 > > 4 Partition: 16.14758460544976 > > ``` > > > > What I want to achieve is that repeated computations of the KMeans > Clustering should yield identical result on identical training data, > regardless of the partitioning. > > > > Looking through the Spark source code, I guess the cause is the > initialization method of KMeans which in turn uses the `takeSample` method, > which does not seem to be partition agnostic. > > > > Is this behaviour expected? Is there anything I could do to achieve > reproducible results? > > > > Best, > > Christoph > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > > > -- > > -- Anastasios Zouzias > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >