Sorry am I missing something? There is a method sortBy on both RDD and

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int =
)(implicitord: Ordering[K], ctag: ClassTag[K]): RDD
Return this RDD sorted by the given key function.

Best Regards,
Founder, Nube Technologies <>
Check out Reifier at Spark Summit 2015


On Tue, Aug 25, 2015 at 12:08 PM, Spark Enthusiast <
> wrote:

> But, there is no sort() primitive for an RDD. How do I sort?
> On Tuesday, 25 August 2015 11:10 AM, Sonal Goyal <>
> wrote:
> I think you could try sorting the endPointsCount and then doing a take.
> This should be a distributed process and only the result would get returned
> to the driver.
> Best Regards,
> Sonal
> Founder, Nube Technologies <>
> Check out Reifier at Spark Summit 2015
> <>
> <>
> On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast <
>> wrote:
> I was running a Spark Job to crunch a 9GB apache log file When I saw the
> following error:
> 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage
> 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal):
> ExecutorLostFailure (executor 29 lost)
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 40), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 86), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 84), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 22), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 48), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
> ShuffleMapTask(37, 12), so marking it as still running
> 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)
> 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove
> executor 29 from BlockManagerMaster.
> 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block
> manager BlockManagerId(29,
> ip-10-150-137-100.ap-southeast-1.compute.internal, 39411)
>                       .
>                       .
> Encountered Exception An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job cancelled because SparkContext was
> shut down
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
> at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1380)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$
>                     .
>                     .
> Looking further, it seems like takeOrdered (called by my application) uses
> collect() internally and hence drains out all the Drive memory.
> line 361, in top10EndPoints
>     topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
>   File "/home/hadoop/spark/python/pyspark/", line 1174, in
> takeOrdered
>     return self.mapPartitions(lambda it: [heapq.nsmallest(num, it,
> key)]).reduce(merge)
>   File "/home/hadoop/spark/python/pyspark/", line 739, in reduce
>     vals = self.mapPartitions(func).collect()
>   File "/home/hadoop/spark/python/pyspark/", line 713, in collect
>     port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>   File
> "/home/hadoop/spark/python/lib/",
> line 538, in __call__
>     self.target_id,
>   File
> "/home/hadoop/spark/python/lib/", line
> 300, in get_return_value
>     format(target_id, '.', name), value)
> How can I rewrite this code
> endpointCounts = (access_logs
>                   .map(lambda log: (log.endpoint, 1))
>                   .reduceByKey(lambda a, b : a + b))
> #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, 
> count2), ....]
> topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
> so that this error does not happen?

Reply via email to