MLIB KMeans Exception
Hi Folks! I'm running a Python Spark job on a cluster with 1 master and 10 slaves (64G RAM and 32 cores each machine). This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and call Kmeans method as following: # SLAVE CODE - Reading features from HDFS def get_features_from_images_hdfs(self, timestamp): def shallow(lista): for row in lista: for col in row: yield col features = self.sc.textFile(hdfs://999.999.99:/FOLDER/) return features.map(lambda row: eval(row)[1]).mapPartitions(shallow) # SLAVE CODE - Extract centroids with Kmeans def extract_centroids_on_slaves(self, features, kmeans_clusters, kmeans_max_iterations, kmeans_mode): #Error line clusters = KMeans.train( features, kmeans_clusters, maxIterations=kmeans_max_iterations, runs=1, initializationMode=kmeans_mode ) return clusters.clusterCenters # MASTER CODE - Main features = get_features_from_images_hdfs(kwargs.get(timestamp)) kmeans_clusters = 1 kmeans_max_interations = 13 kmeans_mode = random centroids = extract_centroids_on_slaves( features, kmeans_clusters, kmeans_max_interations, kmeans_mode ) centroids_rdd = sc.parallelize(centroids) I'm getting the following exception when I call KMeans.train: 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0 (TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID 2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765) org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783) org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844) java.io.DataInputStream.read(DataInputStream.java:100) org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0 (TID 2328, ip-172-31-7-122.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:34 INFO TaskSetManager: Finished task 2351.0 in stage 0.0 (TID 2103) in 77554 ms on ip-172-31-7-122.ec2.internal (1484/10220) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2373 in memory on ip-172-31-7-122.ec2.internal:37861 (size: 48.1 MB, free: 23.2 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2319 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 5.2 MB, free: 23.4 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2302 in memory on ip-172-31-7-122.ec2.internal:37861
Re: MLIB KMeans Exception
How many features and how many partitions? You set kmeans_clusters to 1. If the feature dimension is large, it would be really expensive. You can check the WebUI and see task failures there. The stack trace you posted is from the driver. Btw, the total memory you have is 64GB * 10, so you can cache about 300GB of data under the default setting, which is not enough for the 1.2TB data you want to process. -Xiangrui On Thu, Nov 20, 2014 at 5:57 AM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running a Python Spark job on a cluster with 1 master and 10 slaves (64G RAM and 32 cores each machine). This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and call Kmeans method as following: # SLAVE CODE - Reading features from HDFS def get_features_from_images_hdfs(self, timestamp): def shallow(lista): for row in lista: for col in row: yield col features = self.sc.textFile(hdfs://999.999.99:/FOLDER/) return features.map(lambda row: eval(row)[1]).mapPartitions(shallow) # SLAVE CODE - Extract centroids with Kmeans def extract_centroids_on_slaves(self, features, kmeans_clusters, kmeans_max_iterations, kmeans_mode): #Error line clusters = KMeans.train( features, kmeans_clusters, maxIterations=kmeans_max_iterations, runs=1, initializationMode=kmeans_mode ) return clusters.clusterCenters # MASTER CODE - Main features = get_features_from_images_hdfs(kwargs.get(timestamp)) kmeans_clusters = 1 kmeans_max_interations = 13 kmeans_mode = random centroids = extract_centroids_on_slaves( features, kmeans_clusters, kmeans_max_interations, kmeans_mode ) centroids_rdd = sc.parallelize(centroids) I'm getting the following exception when I call KMeans.train: 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0 (TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID 2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765) org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783) org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844) java.io.DataInputStream.read(DataInputStream.java:100) org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0 (TID 2328,