MLIB KMeans Exception

2014-11-20 Thread Alan Prando
Hi Folks!

I'm running a Python Spark job on a cluster with 1 master and 10 slaves
(64G RAM and 32 cores each machine).
This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and
call Kmeans method as following:

# SLAVE CODE - Reading features from HDFS
def get_features_from_images_hdfs(self, timestamp):
def shallow(lista):
for row in lista:
for col in row:
yield col

features = self.sc.textFile(hdfs://999.999.99:/FOLDER/)
return features.map(lambda row: eval(row)[1]).mapPartitions(shallow)


 # SLAVE CODE - Extract centroids with Kmeans
 def extract_centroids_on_slaves(self, features, kmeans_clusters,
kmeans_max_iterations, kmeans_mode):

#Error line
clusters = KMeans.train(
features,
kmeans_clusters,
maxIterations=kmeans_max_iterations,
runs=1,
initializationMode=kmeans_mode
)
return clusters.clusterCenters

# MASTER CODE - Main
features = get_features_from_images_hdfs(kwargs.get(timestamp))
kmeans_clusters = 1
kmeans_max_interations  = 13
kmeans_mode = random

centroids = extract_centroids_on_slaves(
features,
kmeans_clusters,
kmeans_max_interations,
   kmeans_mode
)

centroids_rdd = sc.parallelize(centroids)


I'm getting the following exception when I call KMeans.train:

 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0
(TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID
2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765)

org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783)
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)
java.io.DataInputStream.read(DataInputStream.java:100)
org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)

org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)

org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)

org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314)

org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on
ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0
(TID 2328, ip-172-31-7-122.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:34 INFO TaskSetManager: Finished task 2351.0 in stage 0.0
(TID 2103) in 77554 ms on ip-172-31-7-122.ec2.internal (1484/10220)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2373 in memory on
ip-172-31-7-122.ec2.internal:37861 (size: 48.1 MB, free: 23.2 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2319 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 5.2 MB, free: 23.4 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2302 in memory on
ip-172-31-7-122.ec2.internal:37861 

Re: MLIB KMeans Exception

2014-11-20 Thread Xiangrui Meng
How many features and how many partitions? You set kmeans_clusters to
1. If the feature dimension is large, it would be really
expensive. You can check the WebUI and see task failures there. The
stack trace you posted is from the driver. Btw, the total memory you
have is 64GB * 10, so you can cache about 300GB of data under the
default setting, which is not enough for the 1.2TB data you want to
process. -Xiangrui

On Thu, Nov 20, 2014 at 5:57 AM, Alan Prando a...@scanboo.com.br wrote:
 Hi Folks!

 I'm running a Python Spark job on a cluster with 1 master and 10 slaves (64G
 RAM and 32 cores each machine).
 This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and
 call Kmeans method as following:

 # SLAVE CODE - Reading features from HDFS
 def get_features_from_images_hdfs(self, timestamp):
 def shallow(lista):
 for row in lista:
 for col in row:
 yield col

 features = self.sc.textFile(hdfs://999.999.99:/FOLDER/)
 return features.map(lambda row: eval(row)[1]).mapPartitions(shallow)


  # SLAVE CODE - Extract centroids with Kmeans
  def extract_centroids_on_slaves(self, features, kmeans_clusters,
 kmeans_max_iterations, kmeans_mode):

 #Error line
 clusters = KMeans.train(
 features,
 kmeans_clusters,
 maxIterations=kmeans_max_iterations,
 runs=1,
 initializationMode=kmeans_mode
 )
 return clusters.clusterCenters

 # MASTER CODE - Main
 features = get_features_from_images_hdfs(kwargs.get(timestamp))
 kmeans_clusters = 1
 kmeans_max_interations  = 13
 kmeans_mode = random

 centroids = extract_centroids_on_slaves(
 features,
 kmeans_clusters,
 kmeans_max_interations,
kmeans_mode
 )

 centroids_rdd = sc.parallelize(centroids)


 I'm getting the following exception when I call KMeans.train:

  14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0
 (TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes)
 14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID
 2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed
 org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765)

 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783)
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)
 java.io.DataInputStream.read(DataInputStream.java:100)
 org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)

 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
 org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)

 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)

 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220)
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)

 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)

 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)

 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314)

 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on
 ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB)
 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on
 ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB)
 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on
 ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB)
 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on
 ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB)
 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on
 ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB)
 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on
 ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB)
 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0
 (TID 2328,