Sorry am I missing something? There is a method sortBy on both RDD and PairRDD.
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html#partitions:Array[org.apache.spark.Partition]> )(implicitord: Ordering[K], ctag: ClassTag[K]): RDD <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html> [T] Return this RDD sorted by the given key function. Best Regards, Sonal Founder, Nube Technologies <http://www.nubetech.co> Check out Reifier at Spark Summit 2015 <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/> <http://in.linkedin.com/in/sonalgoyal> On Tue, Aug 25, 2015 at 12:08 PM, Spark Enthusiast <sparkenthusi...@yahoo.in > wrote: > But, there is no sort() primitive for an RDD. How do I sort? > > > > On Tuesday, 25 August 2015 11:10 AM, Sonal Goyal <sonalgoy...@gmail.com> > wrote: > > > I think you could try sorting the endPointsCount and then doing a take. > This should be a distributed process and only the result would get returned > to the driver. > > Best Regards, > Sonal > Founder, Nube Technologies <http://www.nubetech.co/> > Check out Reifier at Spark Summit 2015 > <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/> > > <http://in.linkedin.com/in/sonalgoyal> > > > > On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast < > sparkenthusi...@yahoo.in> wrote: > > I was running a Spark Job to crunch a 9GB apache log file When I saw the > following error: > > > 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage > 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): > ExecutorLostFailure (executor 29 lost) > 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted > ShuffleMapTask(37, 40), so marking it as still running > 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted > ShuffleMapTask(37, 86), so marking it as still running > 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted > ShuffleMapTask(37, 84), so marking it as still running > 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted > ShuffleMapTask(37, 22), so marking it as still running > 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted > ShuffleMapTask(37, 48), so marking it as still running > 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted > ShuffleMapTask(37, 12), so marking it as still running > 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59) > 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove > executor 29 from BlockManagerMaster. > 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block > manager BlockManagerId(29, > ip-10-150-137-100.ap-southeast-1.compute.internal, 39411) > > . > . > Encountered Exception An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe. > : org.apache.spark.SparkException: Job cancelled because SparkContext was > shut down > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) > at > org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) > at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) > at org.apache.spark.SparkContext.stop(SparkContext.scala:1380) > at > org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143) > > . > . > > Looking further, it seems like takeOrdered (called by my application) uses > collect() internally and hence drains out all the Drive memory. > > line 361, in top10EndPoints > topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) > File "/home/hadoop/spark/python/pyspark/rdd.py", line 1174, in > takeOrdered > return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, > key)]).reduce(merge) > File "/home/hadoop/spark/python/pyspark/rdd.py", line 739, in reduce > vals = self.mapPartitions(func).collect() > File "/home/hadoop/spark/python/pyspark/rdd.py", line 713, in collect > port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > File > "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > self.target_id, self.name) > File > "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line > 300, in get_return_value > format(target_id, '.', name), value) > > How can I rewrite this code > > > > endpointCounts = (access_logs > > .map(lambda log: (log.endpoint, 1)) > > .reduceByKey(lambda a, b : a + b)) > > > #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, > count2), ....] > > > topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) > > > so that this error does not happen? > > > > >