I was running a Spark Job to crunch a 9GB apache log file When I saw the following error:
15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): ExecutorLostFailure (executor 29 lost)15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 40), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 86), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 84), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 22), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 48), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 12), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove executor 29 from BlockManagerMaster.15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(29, ip-10-150-137-100.ap-southeast-1.compute.internal, 39411) . .Encountered Exception An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143) . . Looking further, it seems like takeOrdered (called by my application) uses collect() internally and hence drains out all the Drive memory. line 361, in top10EndPoints topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) File "/home/hadoop/spark/python/pyspark/rdd.py", line 1174, in takeOrdered return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge) File "/home/hadoop/spark/python/pyspark/rdd.py", line 739, in reduce vals = self.mapPartitions(func).collect() File "/home/hadoop/spark/python/pyspark/rdd.py", line 713, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ self.target_id, self.name) File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value format(target_id, '.', name), value) How can I rewrite this code endpointCounts = (access_logs .map(lambda log: (log.endpoint, 1)) .reduceByKey(lambda a, b : a + b)) #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, count2), ....] topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) so that this error does not happen?