Hi Folks!

I'm running a Python Spark job on a cluster with 1 master and 10 slaves
(64G RAM and 32 cores each machine).
This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and
call Kmeans method as following:

    # SLAVE CODE - Reading features from HDFS
    def get_features_from_images_hdfs(self, timestamp):
        def shallow(lista):
            for row in lista:
                for col in row:
                    yield col

        features = self.sc.textFile("hdfs://999.999.99:8888/FOLDER/")
        return features.map(lambda row: eval(row)[1]).mapPartitions(shallow)


     # SLAVE CODE - Extract centroids with Kmeans
     def extract_centroids_on_slaves(self, features, kmeans_clusters,
kmeans_max_iterations, kmeans_mode):

        #Error line
        clusters = KMeans.train(
            features,
            kmeans_clusters,
            maxIterations=kmeans_max_iterations,
            runs=1,
            initializationMode=kmeans_mode
        )
        return clusters.clusterCenters

    # MASTER CODE - Main
    features = get_features_from_images_hdfs(kwargs.get("timestamp"))
    kmeans_clusters = 10000
    kmeans_max_interations  = 13
    kmeans_mode = random

    centroids = extract_centroids_on_slaves(
        features,
        kmeans_clusters,
        kmeans_max_interations,
       kmeans_mode
    )

    centroids_rdd = sc.parallelize(centroids)


I'm getting the following exception when I call "KMeans.train":

 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0
(TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID
2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed
        org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765)

org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783)
        org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)
        java.io.DataInputStream.read(DataInputStream.java:100)
        org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)

org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
        org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)

org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)

org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
        org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220)
        org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189)
        org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314)

org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on
ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0
(TID 2328, ip-172-31-7-122.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:34 INFO TaskSetManager: Finished task 2351.0 in stage 0.0
(TID 2103) in 77554 ms on ip-172-31-7-122.ec2.internal (1484/10220)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2373 in memory on
ip-172-31-7-122.ec2.internal:37861 (size: 48.1 MB, free: 23.2 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2319 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 5.2 MB, free: 23.4 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2302 in memory on
ip-172-31-7-122.ec2.internal:37861 (size: 5.2 MB, free: 23.2 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2368 in memory on
ip-172-31-7-122.ec2.internal:37861 (size: 5.1 MB, free: 23.2 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2382 in memory on
ip-172-31-7-125.ec2.internal:45530 (size: 46.8 MB, free: 23.0 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2233 in memory on
ip-172-31-7-125.ec2.internal:45530 (size: 5.2 MB, free: 23.0 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2282 in memory on
ip-172-31-7-125.ec2.internal:45530 (size: 5.1 MB, free: 23.0 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2280 in memory on
ip-172-31-7-121.ec2.internal:57211 (size: 47.8 MB, free: 23.0 GB)
14/11/20 13:19:35 INFO TaskSetManager: Starting task 2549.0 in stage 0.0
(TID 2329, ip-172-31-7-121.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:35 INFO TaskSetManager: Finished task 2239.0 in stage 0.0
(TID 1994) in 83186 ms on ip-172-31-7-121.ec2.internal (1485/10220)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2251 in memory on
ip-172-31-7-121.ec2.internal:57211 (size: 5.2 MB, free: 23.0 GB)
14/11/20 13:19:35 INFO TaskSetManager: Starting task 2486.1 in stage 0.0
(TID 2330, ip-172-31-7-125.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:35 INFO TaskSetManager: Finished task 2333.0 in stage 0.0
(TID 2079) in 79142 ms on ip-172-31-7-125.ec2.internal (1486/10220)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2352 in memory on
ip-172-31-7-125.ec2.internal:45530 (size: 46.9 MB, free: 23.0 GB)
14/11/20 13:19:36 ERROR YarnClientSchedulerBackend: Yarn application
already ended: FINISHED
14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_2_2433 in memory on
ip-172-31-7-117.ec2.internal:44404 (size: 46.9 MB, free: 23.4 GB)
14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_4_2229 in memory on
ip-172-31-7-121.ec2.internal:57211 (size: 5.2 MB, free: 23.0 GB)
14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_2_2457 in memory on
ip-172-31-7-117.ec2.internal:44404 (size: 46.7 MB, free: 23.4 GB)
14/11/20 13:19:36 INFO SparkUI: Stopped Spark web UI at
http://ip-172-31-7-115.ec2.internal:4040
14/11/20 13:19:36 INFO DAGScheduler: Stopping DAGScheduler
14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Shutting down all
executors
14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Asking each executor to
shut down
14/11/20 13:19:36 INFO DAGScheduler: Failed to run takeSample at
KMeans.scala:244
14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Stopped
Traceback (most recent call last):
  File
"/home/ubuntu/spark-feature-clusterization/feature-clusterization.py", line
51, in <module>
    main()
  File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py",
line 610, in __call__
    return self.main(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py",
line 590, in main
    rv = self.invoke(ctx)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py",
line 782, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py",
line 416, in invoke
    return callback(*args, **kwargs)
  File
"/home/ubuntu/spark-feature-clusterization/feature-clusterization.py", line
40, in main
    kwargs.get("kmeansmode")
  File "/home/ubuntu/spark-feature-clusterization/boospark.py", line 39, in
extract_centroids_on_slaves
    initializationMode=kmeans_mode
  File
"/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/mllib/clustering.py",
line 88, in train
    dataBytes._jrdd, k, maxIterations, runs, initializationMode)
  File
"/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File
"/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o40.trainKMeansModel.
: org.apache.spark.SparkException: Job cancelled because SparkContext was
shut down
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
    at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
    at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks in advance!

---
Regards,
Alan Vidotti Prando.

Reply via email to