I think you could try sorting the endPointsCount and then doing a take.
This should be a distributed process and only the result would get returned
to the driver.

Best Regards,
Sonal
Founder, Nube Technologies <http://www.nubetech.co>
Check out Reifier at Spark Summit 2015
<https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>

<http://in.linkedin.com/in/sonalgoyal>



On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast <sparkenthusi...@yahoo.in
> wrote:

> I was running a Spark Job to crunch a 9GB apache log file When I saw the
> following error:
>
>
> 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage
> 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal):
> ExecutorLostFailure (executor 29 lost)
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 40), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 86), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 84), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 22), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 48), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 12), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)
> 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove
> executor 29 from BlockManagerMaster.
> 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block
> manager BlockManagerId(29,
> ip-10-150-137-100.ap-southeast-1.compute.internal, 39411)
>
>                       .
>                       .
> Encountered Exception An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job cancelled because SparkContext was
> shut down
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
> at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1380)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143)
>
>                     .
>                     .
>
> Looking further, it seems like takeOrdered (called by my application) uses
> collect() internally and hence drains out all the Drive memory.
>
> line 361, in top10EndPoints
>     topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 1174, in
> takeOrdered
>     return self.mapPartitions(lambda it: [heapq.nsmallest(num, it,
> key)]).reduce(merge)
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 739, in reduce
>     vals = self.mapPartitions(func).collect()
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 713, in collect
>     port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>     self.target_id, self.name)
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
>     format(target_id, '.', name), value)
>
> How can I rewrite this code
>
>
>
> endpointCounts = (access_logs
>
>                   .map(lambda log: (log.endpoint, 1))
>
>                   .reduceByKey(lambda a, b : a + b))
>
>
> #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, 
> count2), ....]
>
>
> topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
>
>
> so that this error does not happen?
>
>

Reply via email to