Thank u so much for the solution. I run the code like this, JavaRDD<User> rdd = context.parallelize(usersList); JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){
@Override public String call(User usr1) throws Exception { String userName = usr1.getUserName().toUpperCase(); return userName ; } }, false, 1); User user_top=rdd_sorted_users.first(); System.out.println("The top user is :"+user_top.getUserName()); But it is giving me this exception, however my class has already implemented the java.io.Serializable 15/07/07 08:13:07 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. java.io.NotSerializableException: Model.User at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl.org $apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:79) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:58) at org.apache.spark.rpc.akka.AkkaRpcEnv.org $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127) at org.apache.spark.rpc.akka.AkkaRpcEnv.org $apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/07/07 08:13:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/07/07 08:13:07 ERROR TaskSchedulerImpl: Resource offer failed, task set TaskSet_0 was not serializable 15/07/07 08:13:07 INFO TaskSchedulerImpl: Cancelling stage 0 15/07/07 08:13:07 INFO DAGScheduler: ResultStage 0 (first at HelloWorldnAddition.java:300) failed in 0.136 s Exception - Message: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: Model.User 2015-07-07 17:07 GMT+02:00 oubrik [via Apache Spark User List] < ml-node+s1001560n2369...@n3.nabble.com>: > JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){ > > @Override > public String call(User arg0) throws Exception { > String userName = usr1.getUserName().toUpperCase(); > return userName ; > } > > }, false, 1); > > false :ascending > true:descending > > > for top : User user_top=rdd_sorted_users.first() > > 2015-07-07 17:05 GMT+02:00 Brahim Oubrik <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=23696&i=0>>: > >> sorry User not PPP >> >> 2015-07-07 17:04 GMT+02:00 Brahim Oubrik <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=23696&i=1>>: >> >>> JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){ >>> >>> @Override >>> public String call(PPP arg0) throws Exception { >>> String userName = usr1.getUserName().toUpperCase(); >>> return userName ; >>> } >>> >>> }, false, 1); >>> >>> false :ascending >>> true:descending >>> >>> >>> for top : User user_top=rdd_sorted_users.first() >>> >>> 2015-07-07 16:54 GMT+02:00 Hafsa Asif [via Apache Spark User List] <[hidden >>> email] <http:///user/SendEmail.jtp?type=node&node=23696&i=2>>: >>> >>>> Rusty, >>>> >>>> I am very thankful for your help. Actually, I am facing difficulty in >>>> objects. My plan is that, I have an object list containing list of User >>>> objects. After parallelizing it through spark context, I apply comparator >>>> on user.getUserName(). As usernames are sorted, their related user object >>>> are sorted according to user names. >>>> In the end when I apply top, I get the whole object of user . >>>> >>>> Some like this: >>>> >>>> public static Comparator<User> UserComparator >>>> = new Comparator<User>() { >>>> >>>> public int compare(User usr1, User usr2) { >>>> String userName1 = usr1.getUserName().toUpperCase(); >>>> String userName2 = usr1.getUserName().toUpperCase(); >>>> >>>> //ascending order >>>> return userName1.compareTo(userName2); >>>> >>>> //descending order >>>> //return fruitName2.compareTo(fruitName1); >>>> } >>>> >>>> }; >>>> >>>> JavaRDD<User> rdd = context.parallelize(usersList); >>>> >>>> 2015-07-07 16:05 GMT+02:00 rusty [via Apache Spark User List] <[hidden >>>> email] <http:///user/SendEmail.jtp?type=node&node=23688&i=0>>: >>>> >>>>> JavaRDD<String> lines2 = ctx.parallelize(Arrays.asList("3", "6", "2", >>>>> "5", "8", "6", "7")); >>>>> List<String> top =lines2.top(7, new >>>>> CustomComaprator<String>()); >>>>> for (String integer : top) { >>>>> System.out.println(integer); >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> class CustomComaprator<T> implements Serializable, Comparator<T> { >>>>> /** >>>>> * >>>>> */ >>>>> public CustomComaprator() { >>>>> // TODO Auto-generated constructor stub >>>>> >>>>> } >>>>> >>>>> private static final long serialVersionUID = >>>>> 2004092520677431781L; >>>>> >>>>> @Override >>>>> public int compare(T o11, T o12) { >>>>> int o1 = Integer.parseInt(String.valueOf(o11)); >>>>> int o2 = Integer.parseInt(String.valueOf(o12)); >>>>> >>>>> return o1 > o2 ? 1 : o1 == o2 ? 0 : -1; >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> ------------------------------ >>>>> If you reply to this email, your message will be added to the >>>>> discussion below: >>>>> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23684.html >>>>> To unsubscribe from How to implement top() and filter() on object >>>>> List for JavaRDD, click here. >>>>> NAML >>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> If you reply to this email, your message will be added to the >>>> discussion below: >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23688.html >>>> To unsubscribe from How to implement top() and filter() on object List >>>> for JavaRDD, click here. >>>> NAML >>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>> >>> >>> >>> -- >>> Bien cordialement, >>> >>> >>> *-------------------------------------------------------------------------------------* >>> *Brahim OUBRIK* >>> *Elève-Ingénieur en Informatique à Polytech Paris Sud.* >>> *Cell <a href="tel:%2B33%200651137644" value="+33651137644 >>> <%2B33651137644>" target="_blank">+33 0651137644 <%2B33%200651137644>* >>> >>> >>> *-------------------------------------------------------------------------------------* >>> >> >> >> >> -- >> Bien cordialement, >> >> >> *-------------------------------------------------------------------------------------* >> *Brahim OUBRIK* >> *Elève-Ingénieur en Informatique à Polytech Paris Sud.* >> *Cell <a href="tel:%2B33%200651137644" value="+33651137644 >> <%2B33651137644>" target="_blank">+33 0651137644 <%2B33%200651137644>* >> >> >> *-------------------------------------------------------------------------------------* >> > > > > -- > Bien cordialement, > > > *-------------------------------------------------------------------------------------* > *Brahim OUBRIK* > *Elève-Ingénieur en Informatique à Polytech Paris Sud.* > *Cell +33 0651137644 <%2B33%200651137644>* > > > *-------------------------------------------------------------------------------------* > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23696.html > To unsubscribe from How to implement top() and filter() on object List > for JavaRDD, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=23669&code=aGFmc2EuYXNpZkBtYXRjaGluZ3V1LmNvbXwyMzY2OXwtMTA0ODgyNjY3NA==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23698.html Sent from the Apache Spark User List mailing list archive at Nabble.com.