Hi Folks! I'm running a Python Spark job on a cluster with 1 master and 10 slaves (64G RAM and 32 cores each machine). This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and call Kmeans method as following:
# SLAVE CODE - Reading features from HDFS def get_features_from_images_hdfs(self, timestamp): def shallow(lista): for row in lista: for col in row: yield col features = self.sc.textFile("hdfs://999.999.99:8888/FOLDER/") return features.map(lambda row: eval(row)[1]).mapPartitions(shallow) # SLAVE CODE - Extract centroids with Kmeans def extract_centroids_on_slaves(self, features, kmeans_clusters, kmeans_max_iterations, kmeans_mode): #Error line clusters = KMeans.train( features, kmeans_clusters, maxIterations=kmeans_max_iterations, runs=1, initializationMode=kmeans_mode ) return clusters.clusterCenters # MASTER CODE - Main features = get_features_from_images_hdfs(kwargs.get("timestamp")) kmeans_clusters = 10000 kmeans_max_interations = 13 kmeans_mode = random centroids = extract_centroids_on_slaves( features, kmeans_clusters, kmeans_max_interations, kmeans_mode ) centroids_rdd = sc.parallelize(centroids) I'm getting the following exception when I call "KMeans.train": 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0 (TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID 2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765) org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783) org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844) java.io.DataInputStream.read(DataInputStream.java:100) org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0 (TID 2328, ip-172-31-7-122.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:34 INFO TaskSetManager: Finished task 2351.0 in stage 0.0 (TID 2103) in 77554 ms on ip-172-31-7-122.ec2.internal (1484/10220) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2373 in memory on ip-172-31-7-122.ec2.internal:37861 (size: 48.1 MB, free: 23.2 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2319 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 5.2 MB, free: 23.4 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2302 in memory on ip-172-31-7-122.ec2.internal:37861 (size: 5.2 MB, free: 23.2 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2368 in memory on ip-172-31-7-122.ec2.internal:37861 (size: 5.1 MB, free: 23.2 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2382 in memory on ip-172-31-7-125.ec2.internal:45530 (size: 46.8 MB, free: 23.0 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2233 in memory on ip-172-31-7-125.ec2.internal:45530 (size: 5.2 MB, free: 23.0 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2282 in memory on ip-172-31-7-125.ec2.internal:45530 (size: 5.1 MB, free: 23.0 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2280 in memory on ip-172-31-7-121.ec2.internal:57211 (size: 47.8 MB, free: 23.0 GB) 14/11/20 13:19:35 INFO TaskSetManager: Starting task 2549.0 in stage 0.0 (TID 2329, ip-172-31-7-121.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:35 INFO TaskSetManager: Finished task 2239.0 in stage 0.0 (TID 1994) in 83186 ms on ip-172-31-7-121.ec2.internal (1485/10220) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2251 in memory on ip-172-31-7-121.ec2.internal:57211 (size: 5.2 MB, free: 23.0 GB) 14/11/20 13:19:35 INFO TaskSetManager: Starting task 2486.1 in stage 0.0 (TID 2330, ip-172-31-7-125.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:35 INFO TaskSetManager: Finished task 2333.0 in stage 0.0 (TID 2079) in 79142 ms on ip-172-31-7-125.ec2.internal (1486/10220) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2352 in memory on ip-172-31-7-125.ec2.internal:45530 (size: 46.9 MB, free: 23.0 GB) 14/11/20 13:19:36 ERROR YarnClientSchedulerBackend: Yarn application already ended: FINISHED 14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_2_2433 in memory on ip-172-31-7-117.ec2.internal:44404 (size: 46.9 MB, free: 23.4 GB) 14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_4_2229 in memory on ip-172-31-7-121.ec2.internal:57211 (size: 5.2 MB, free: 23.0 GB) 14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_2_2457 in memory on ip-172-31-7-117.ec2.internal:44404 (size: 46.7 MB, free: 23.4 GB) 14/11/20 13:19:36 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-7-115.ec2.internal:4040 14/11/20 13:19:36 INFO DAGScheduler: Stopping DAGScheduler 14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Shutting down all executors 14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Asking each executor to shut down 14/11/20 13:19:36 INFO DAGScheduler: Failed to run takeSample at KMeans.scala:244 14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Stopped Traceback (most recent call last): File "/home/ubuntu/spark-feature-clusterization/feature-clusterization.py", line 51, in <module> main() File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line 610, in __call__ return self.main(*args, **kwargs) File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line 590, in main rv = self.invoke(ctx) File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line 782, in invoke return ctx.invoke(self.callback, **ctx.params) File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line 416, in invoke return callback(*args, **kwargs) File "/home/ubuntu/spark-feature-clusterization/feature-clusterization.py", line 40, in main kwargs.get("kmeansmode") File "/home/ubuntu/spark-feature-clusterization/boospark.py", line 39, in extract_centroids_on_slaves initializationMode=kmeans_mode File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/mllib/clustering.py", line 88, in train dataBytes._jrdd, k, maxIterations, runs, initializationMode) File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o40.trainKMeansModel. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks in advance! --- Regards, Alan Vidotti Prando.