I was running a Spark Job to crunch a 9GB apache log file When I saw the 
following error:


15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 
(TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): 
ExecutorLostFailure (executor 29 lost)15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 40), so marking it as 
still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted 
ShuffleMapTask(37, 86), so marking it as still running15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 84), so marking it as 
still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted 
ShuffleMapTask(37, 22), so marking it as still running15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 48), so marking it as 
still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted 
ShuffleMapTask(37, 12), so marking it as still running15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Executor lost: 29 (epoch 59)15/08/25 04:25:16 INFO 
storage.BlockManagerMasterActor: Trying to remove executor 29 from 
BlockManagerMaster.15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: 
Removing block manager BlockManagerId(29, 
ip-10-150-137-100.ap-southeast-1.compute.internal, 39411)
                      .                      .Encountered Exception An error 
occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.: 
org.apache.spark.SparkException: Job cancelled because SparkContext was shut 
down at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at 
org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at 
org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143)
                    .                    .
Looking further, it seems like takeOrdered (called by my application) uses 
collect() internally and hence drains out all the Drive memory.
line 361, in top10EndPoints    topEndpoints = endpointCounts.takeOrdered(10, 
lambda s: -1 * s[1])  File "/home/hadoop/spark/python/pyspark/rdd.py", line 
1174, in takeOrdered    return self.mapPartitions(lambda it: 
[heapq.nsmallest(num, it, key)]).reduce(merge)  File 
"/home/hadoop/spark/python/pyspark/rdd.py", line 739, in reduce    vals = 
self.mapPartitions(func).collect()  File 
"/home/hadoop/spark/python/pyspark/rdd.py", line 713, in collect    port = 
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())  File 
"/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 
538, in __call__    self.target_id, self.name)  File 
"/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 
300, in get_return_value    format(target_id, '.', name), value)
How can I rewrite this code



endpointCounts = (access_logs                  .map(lambda log: (log.endpoint, 
1))                  .reduceByKey(lambda a, b : a + b))
#Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, 
count2), ....]
topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])

so that this error does not happen?

Reply via email to