How many features and how many partitions? You set kmeans_clusters to
10000. If the feature dimension is large, it would be really
expensive. You can check the WebUI and see task failures there. The
stack trace you posted is from the driver. Btw, the total memory you
have is 64GB * 10, so you can cache about 300GB of data under the
default setting, which is not enough for the 1.2TB data you want to
process. -Xiangrui

On Thu, Nov 20, 2014 at 5:57 AM, Alan Prando <a...@scanboo.com.br> wrote:
> Hi Folks!
>
> I'm running a Python Spark job on a cluster with 1 master and 10 slaves (64G
> RAM and 32 cores each machine).
> This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and
> call Kmeans method as following:
>
>     # SLAVE CODE - Reading features from HDFS
>     def get_features_from_images_hdfs(self, timestamp):
>         def shallow(lista):
>             for row in lista:
>                 for col in row:
>                     yield col
>
>         features = self.sc.textFile("hdfs://999.999.99:8888/FOLDER/")
>         return features.map(lambda row: eval(row)[1]).mapPartitions(shallow)
>
>
>      # SLAVE CODE - Extract centroids with Kmeans
>      def extract_centroids_on_slaves(self, features, kmeans_clusters,
> kmeans_max_iterations, kmeans_mode):
>
>         #Error line
>         clusters = KMeans.train(
>             features,
>             kmeans_clusters,
>             maxIterations=kmeans_max_iterations,
>             runs=1,
>             initializationMode=kmeans_mode
>         )
>         return clusters.clusterCenters
>
>     # MASTER CODE - Main
>     features = get_features_from_images_hdfs(kwargs.get("timestamp"))
>     kmeans_clusters = 10000
>     kmeans_max_interations  = 13
>     kmeans_mode = random
>
>     centroids = extract_centroids_on_slaves(
>         features,
>         kmeans_clusters,
>         kmeans_max_interations,
>        kmeans_mode
>     )
>
>     centroids_rdd = sc.parallelize(centroids)
>
>
> I'm getting the following exception when I call "KMeans.train":
>
>  14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0
> (TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes)
> 14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID
> 2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed
>         org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765)
>
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783)
>         org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)
>         java.io.DataInputStream.read(DataInputStream.java:100)
>         org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>         org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
>
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
>         org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220)
>         org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189)
>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>         org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314)
>
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on
> ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on
> ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on
> ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on
> ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on
> ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on
> ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB)
> 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0
> (TID 2328, ip-172-31-7-122.ec2.internal, NODE_LOCAL, 1649 bytes)
> 14/11/20 13:19:34 INFO TaskSetManager: Finished task 2351.0 in stage 0.0
> (TID 2103) in 77554 ms on ip-172-31-7-122.ec2.internal (1484/10220)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2373 in memory on
> ip-172-31-7-122.ec2.internal:37861 (size: 48.1 MB, free: 23.2 GB)
> 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2319 in memory on
> ip-172-31-7-124.ec2.internal:56258 (size: 5.2 MB, free: 23.4 GB)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2302 in memory on
> ip-172-31-7-122.ec2.internal:37861 (size: 5.2 MB, free: 23.2 GB)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2368 in memory on
> ip-172-31-7-122.ec2.internal:37861 (size: 5.1 MB, free: 23.2 GB)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2382 in memory on
> ip-172-31-7-125.ec2.internal:45530 (size: 46.8 MB, free: 23.0 GB)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2233 in memory on
> ip-172-31-7-125.ec2.internal:45530 (size: 5.2 MB, free: 23.0 GB)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2282 in memory on
> ip-172-31-7-125.ec2.internal:45530 (size: 5.1 MB, free: 23.0 GB)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2280 in memory on
> ip-172-31-7-121.ec2.internal:57211 (size: 47.8 MB, free: 23.0 GB)
> 14/11/20 13:19:35 INFO TaskSetManager: Starting task 2549.0 in stage 0.0
> (TID 2329, ip-172-31-7-121.ec2.internal, NODE_LOCAL, 1649 bytes)
> 14/11/20 13:19:35 INFO TaskSetManager: Finished task 2239.0 in stage 0.0
> (TID 1994) in 83186 ms on ip-172-31-7-121.ec2.internal (1485/10220)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2251 in memory on
> ip-172-31-7-121.ec2.internal:57211 (size: 5.2 MB, free: 23.0 GB)
> 14/11/20 13:19:35 INFO TaskSetManager: Starting task 2486.1 in stage 0.0
> (TID 2330, ip-172-31-7-125.ec2.internal, NODE_LOCAL, 1649 bytes)
> 14/11/20 13:19:35 INFO TaskSetManager: Finished task 2333.0 in stage 0.0
> (TID 2079) in 79142 ms on ip-172-31-7-125.ec2.internal (1486/10220)
> 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_2_2352 in memory on
> ip-172-31-7-125.ec2.internal:45530 (size: 46.9 MB, free: 23.0 GB)
> 14/11/20 13:19:36 ERROR YarnClientSchedulerBackend: Yarn application already
> ended: FINISHED
> 14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_2_2433 in memory on
> ip-172-31-7-117.ec2.internal:44404 (size: 46.9 MB, free: 23.4 GB)
> 14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_4_2229 in memory on
> ip-172-31-7-121.ec2.internal:57211 (size: 5.2 MB, free: 23.0 GB)
> 14/11/20 13:19:36 INFO BlockManagerInfo: Added rdd_2_2457 in memory on
> ip-172-31-7-117.ec2.internal:44404 (size: 46.7 MB, free: 23.4 GB)
> 14/11/20 13:19:36 INFO SparkUI: Stopped Spark web UI at
> http://ip-172-31-7-115.ec2.internal:4040
> 14/11/20 13:19:36 INFO DAGScheduler: Stopping DAGScheduler
> 14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Shutting down all
> executors
> 14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Asking each executor to
> shut down
> 14/11/20 13:19:36 INFO DAGScheduler: Failed to run takeSample at
> KMeans.scala:244
> 14/11/20 13:19:36 INFO YarnClientSchedulerBackend: Stopped
> Traceback (most recent call last):
>   File
> "/home/ubuntu/spark-feature-clusterization/feature-clusterization.py", line
> 51, in <module>
>     main()
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line
> 610, in __call__
>     return self.main(*args, **kwargs)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line
> 590, in main
>     rv = self.invoke(ctx)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line
> 782, in invoke
>     return ctx.invoke(self.callback, **ctx.params)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/click/core.py", line
> 416, in invoke
>     return callback(*args, **kwargs)
>   File
> "/home/ubuntu/spark-feature-clusterization/feature-clusterization.py", line
> 40, in main
>     kwargs.get("kmeansmode")
>   File "/home/ubuntu/spark-feature-clusterization/boospark.py", line 39, in
> extract_centroids_on_slaves
>     initializationMode=kmeans_mode
>   File
> "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/mllib/clustering.py",
> line 88, in train
>     dataBytes._jrdd, k, maxIterations, runs, initializationMode)
>   File
> "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o40.trainKMeansModel.
> : org.apache.spark.SparkException: Job cancelled because SparkContext was
> shut down
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
>     at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>     at
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
>     at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
>     at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
>     at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
>     at akka.actor.ActorCell.terminate(ActorCell.scala:338)
>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>     at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Thanks in advance!
>
> ---
> Regards,
> Alan Vidotti Prando.
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to