Thank u so much for the solution. I run the code like this,

JavaRDD<User> rdd = context.parallelize(usersList);
 JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){

                    @Override
                    public String call(User usr1) throws Exception {
                        String userName = usr1.getUserName().toUpperCase();
                        return userName ;
                    }

                 }, false, 1);

                User user_top=rdd_sorted_users.first();
                System.out.println("The top user is
:"+user_top.getUserName());


But it is giving me this exception, however my class has already
implemented the java.io.Serializable
15/07/07 08:13:07 ERROR TaskSetManager: Failed to serialize task 0, not
attempting to retry it.
java.io.NotSerializableException: Model.User
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
    at
org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at
org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
    at
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
    at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org
$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
    at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
    at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
    at
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:79)
    at
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:58)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
    at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
    at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
    at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
    at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/07/07 08:13:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
15/07/07 08:13:07 ERROR TaskSchedulerImpl: Resource offer failed, task set
TaskSet_0 was not serializable
15/07/07 08:13:07 INFO TaskSchedulerImpl: Cancelling stage 0
15/07/07 08:13:07 INFO DAGScheduler: ResultStage 0 (first at
HelloWorldnAddition.java:300) failed in 0.136 s
Exception - Message: Job aborted due to stage failure: Failed to serialize
task 0, not attempting to retry it. Exception during serialization:
java.io.NotSerializableException: Model.User




2015-07-07 17:07 GMT+02:00 oubrik [via Apache Spark User List] <
ml-node+s1001560n2369...@n3.nabble.com>:

> JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){
>
>                         @Override
>                         public String call(User arg0) throws Exception {
>             String userName = usr1.getUserName().toUpperCase();
>                             return userName ;
>                         }
>
>                     }, false, 1);
>
> false :ascending
> true:descending
>
>
> for top : User user_top=rdd_sorted_users.first()
>
> 2015-07-07 17:05 GMT+02:00 Brahim Oubrik <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=23696&i=0>>:
>
>> sorry User not PPP
>>
>> 2015-07-07 17:04 GMT+02:00 Brahim Oubrik <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=23696&i=1>>:
>>
>>> JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){
>>>
>>>                         @Override
>>>                         public String call(PPP arg0) throws Exception {
>>>             String userName = usr1.getUserName().toUpperCase();
>>>                             return userName ;
>>>                         }
>>>
>>>                     }, false, 1);
>>>
>>> false :ascending
>>> true:descending
>>>
>>>
>>> for top : User user_top=rdd_sorted_users.first()
>>>
>>> 2015-07-07 16:54 GMT+02:00 Hafsa Asif [via Apache Spark User List] <[hidden
>>> email] <http:///user/SendEmail.jtp?type=node&node=23696&i=2>>:
>>>
>>>> Rusty,
>>>>
>>>> I am very thankful for your help. Actually, I am facing difficulty in
>>>> objects. My plan is that, I have an object list containing list of User
>>>> objects. After parallelizing it through spark context, I apply comparator
>>>> on user.getUserName(). As usernames are sorted, their related user object
>>>> are sorted according to user names.
>>>>  In the end when I apply top, I get the whole object of user .
>>>>
>>>> Some like this:
>>>>
>>>> public static Comparator<User> UserComparator
>>>>             = new Comparator<User>() {
>>>>
>>>>         public int compare(User usr1, User usr2) {
>>>>             String userName1 = usr1.getUserName().toUpperCase();
>>>>             String userName2 = usr1.getUserName().toUpperCase();
>>>>
>>>>             //ascending order
>>>>             return userName1.compareTo(userName2);
>>>>
>>>>             //descending order
>>>>             //return fruitName2.compareTo(fruitName1);
>>>>         }
>>>>
>>>>     };
>>>>
>>>> JavaRDD<User> rdd = context.parallelize(usersList);
>>>>
>>>> 2015-07-07 16:05 GMT+02:00 rusty [via Apache Spark User List] <[hidden
>>>> email] <http:///user/SendEmail.jtp?type=node&node=23688&i=0>>:
>>>>
>>>>> JavaRDD<String> lines2 = ctx.parallelize(Arrays.asList("3", "6", "2",
>>>>>                                 "5", "8", "6", "7"));
>>>>>                 List<String> top =lines2.top(7, new
>>>>> CustomComaprator<String>());
>>>>>                 for (String integer : top) {
>>>>>                         System.out.println(integer);
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> class CustomComaprator<T> implements Serializable, Comparator<T> {
>>>>>         /**
>>>>>          *
>>>>>          */
>>>>>         public CustomComaprator() {
>>>>>                 // TODO Auto-generated constructor stub
>>>>>
>>>>>         }
>>>>>
>>>>>         private static final long serialVersionUID =
>>>>> 2004092520677431781L;
>>>>>
>>>>>         @Override
>>>>>         public int compare(T o11, T o12) {
>>>>>                 int o1 = Integer.parseInt(String.valueOf(o11));
>>>>>                 int o2 = Integer.parseInt(String.valueOf(o12));
>>>>>
>>>>>                 return o1 > o2 ? 1 : o1 == o2 ? 0 : -1;
>>>>>         }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>  If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>>
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23684.html
>>>>>  To unsubscribe from How to implement top() and filter() on object
>>>>> List for JavaRDD, click here.
>>>>> NAML
>>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>  If you reply to this email, your message will be added to the
>>>> discussion below:
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23688.html
>>>>  To unsubscribe from How to implement top() and filter() on object List
>>>> for JavaRDD, click here.
>>>> NAML
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>>
>>> --
>>> Bien cordialement,
>>>
>>>
>>> *-------------------------------------------------------------------------------------*
>>> *Brahim OUBRIK*
>>> *Elève-Ingénieur en Informatique à Polytech Paris Sud.*
>>> *Cell <a href="tel:%2B33%200651137644" value="+33651137644
>>> <%2B33651137644>" target="_blank">+33 0651137644 <%2B33%200651137644>*
>>>
>>>
>>> *-------------------------------------------------------------------------------------*
>>>
>>
>>
>>
>> --
>> Bien cordialement,
>>
>>
>> *-------------------------------------------------------------------------------------*
>> *Brahim OUBRIK*
>> *Elève-Ingénieur en Informatique à Polytech Paris Sud.*
>> *Cell <a href="tel:%2B33%200651137644" value="+33651137644
>> <%2B33651137644>" target="_blank">+33 0651137644 <%2B33%200651137644>*
>>
>>
>> *-------------------------------------------------------------------------------------*
>>
>
>
>
> --
> Bien cordialement,
>
>
> *-------------------------------------------------------------------------------------*
> *Brahim OUBRIK*
> *Elève-Ingénieur en Informatique à Polytech Paris Sud.*
> *Cell +33 0651137644 <%2B33%200651137644>*
>
>
> *-------------------------------------------------------------------------------------*
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23696.html
>  To unsubscribe from How to implement top() and filter() on object List
> for JavaRDD, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=23669&code=aGFmc2EuYXNpZkBtYXRjaGluZ3V1LmNvbXwyMzY2OXwtMTA0ODgyNjY3NA==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to