bq. my class has already implemented the java.io.Serializable Can you show the code for Model.User class ?
Cheers On Tue, Jul 7, 2015 at 8:18 AM, Hafsa Asif <hafsa.a...@matchinguu.com> wrote: > Thank u so much for the solution. I run the code like this, > > JavaRDD<User> rdd = context.parallelize(usersList); > JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){ > > @Override > public String call(User usr1) throws Exception { > String userName = usr1.getUserName().toUpperCase(); > return userName ; > } > > }, false, 1); > > User user_top=rdd_sorted_users.first(); > System.out.println("The top user is > :"+user_top.getUserName()); > > > But it is giving me this exception, however my class has already > implemented the java.io.Serializable > 15/07/07 08:13:07 ERROR TaskSetManager: Failed to serialize task 0, not > attempting to retry it. > java.io.NotSerializableException: Model.User > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at > org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) > at > org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) > at > org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) > at > org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at org.apache.spark.scheduler.TaskSchedulerImpl.org > $apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) > at > org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) > at > org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) > at > org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) > at > org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:79) > at > org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:58) > at org.apache.spark.rpc.akka.AkkaRpcEnv.org > $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178) > at > org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127) > at org.apache.spark.rpc.akka.AkkaRpcEnv.org > $apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198) > at > org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) > at > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 15/07/07 08:13:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 15/07/07 08:13:07 ERROR TaskSchedulerImpl: Resource offer failed, task set > TaskSet_0 was not serializable > 15/07/07 08:13:07 INFO TaskSchedulerImpl: Cancelling stage 0 > 15/07/07 08:13:07 INFO DAGScheduler: ResultStage 0 (first at > HelloWorldnAddition.java:300) failed in 0.136 s > Exception - Message: Job aborted due to stage failure: Failed to serialize > task 0, not attempting to retry it. Exception during serialization: > java.io.NotSerializableException: Model.User > > > > > 2015-07-07 17:07 GMT+02:00 oubrik [via Apache Spark User List] <[hidden > email] <http:///user/SendEmail.jtp?type=node&node=23698&i=0>>: > >> JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){ >> >> @Override >> public String call(User arg0) throws Exception { >> String userName = usr1.getUserName().toUpperCase(); >> return userName ; >> } >> >> }, false, 1); >> >> false :ascending >> true:descending >> >> >> for top : User user_top=rdd_sorted_users.first() >> >> 2015-07-07 17:05 GMT+02:00 Brahim Oubrik <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=23696&i=0>>: >> >>> sorry User not PPP >>> >>> 2015-07-07 17:04 GMT+02:00 Brahim Oubrik <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=23696&i=1>>: >>> >>>> JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){ >>>> >>>> @Override >>>> public String call(PPP arg0) throws Exception { >>>> String userName = usr1.getUserName().toUpperCase(); >>>> return userName ; >>>> } >>>> >>>> }, false, 1); >>>> >>>> false :ascending >>>> true:descending >>>> >>>> >>>> for top : User user_top=rdd_sorted_users.first() >>>> >>>> 2015-07-07 16:54 GMT+02:00 Hafsa Asif [via Apache Spark User List] <[hidden >>>> email] <http:///user/SendEmail.jtp?type=node&node=23696&i=2>>: >>>> >>>>> Rusty, >>>>> >>>>> I am very thankful for your help. Actually, I am facing difficulty in >>>>> objects. My plan is that, I have an object list containing list of User >>>>> objects. After parallelizing it through spark context, I apply comparator >>>>> on user.getUserName(). As usernames are sorted, their related user object >>>>> are sorted according to user names. >>>>> In the end when I apply top, I get the whole object of user . >>>>> >>>>> Some like this: >>>>> >>>>> public static Comparator<User> UserComparator >>>>> = new Comparator<User>() { >>>>> >>>>> public int compare(User usr1, User usr2) { >>>>> String userName1 = usr1.getUserName().toUpperCase(); >>>>> String userName2 = usr1.getUserName().toUpperCase(); >>>>> >>>>> //ascending order >>>>> return userName1.compareTo(userName2); >>>>> >>>>> //descending order >>>>> //return fruitName2.compareTo(fruitName1); >>>>> } >>>>> >>>>> }; >>>>> >>>>> JavaRDD<User> rdd = context.parallelize(usersList); >>>>> >>>>> 2015-07-07 16:05 GMT+02:00 rusty [via Apache Spark User List] <[hidden >>>>> email] <http:///user/SendEmail.jtp?type=node&node=23688&i=0>>: >>>>> >>>>>> JavaRDD<String> lines2 = ctx.parallelize(Arrays.asList("3", "6", "2", >>>>>> "5", "8", "6", "7")); >>>>>> List<String> top =lines2.top(7, new >>>>>> CustomComaprator<String>()); >>>>>> for (String integer : top) { >>>>>> System.out.println(integer); >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> class CustomComaprator<T> implements Serializable, Comparator<T> { >>>>>> /** >>>>>> * >>>>>> */ >>>>>> public CustomComaprator() { >>>>>> // TODO Auto-generated constructor stub >>>>>> >>>>>> } >>>>>> >>>>>> private static final long serialVersionUID = >>>>>> 2004092520677431781L; >>>>>> >>>>>> @Override >>>>>> public int compare(T o11, T o12) { >>>>>> int o1 = Integer.parseInt(String.valueOf(o11)); >>>>>> int o2 = Integer.parseInt(String.valueOf(o12)); >>>>>> >>>>>> return o1 > o2 ? 1 : o1 == o2 ? 0 : -1; >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> If you reply to this email, your message will be added to the >>>>>> discussion below: >>>>>> >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23684.html >>>>>> To unsubscribe from How to implement top() and filter() on object >>>>>> List for JavaRDD, click here. >>>>>> NAML >>>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>> >>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> If you reply to this email, your message will be added to the >>>>> discussion below: >>>>> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23688.html >>>>> To unsubscribe from How to implement top() and filter() on object >>>>> List for JavaRDD, click here. >>>>> NAML >>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>> >>>> >>>> >>>> >>>> -- >>>> Bien cordialement, >>>> >>>> >>>> *-------------------------------------------------------------------------------------* >>>> *Brahim OUBRIK* >>>> *Elève-Ingénieur en Informatique à Polytech Paris Sud.* >>>> *Cell <a href="tel:%2B33%200651137644" value="<a >>>> href="tel:%2B33651137644" value="+33651137644 <%2B33651137644>" >>>> target="_blank">+33651137644 <%2B33651137644>" target="_blank"><a >>>> href="tel:%2B33%200651137644" value="+33651137644 <%2B33651137644>" >>>> target="_blank">+33 0651137644 <%2B33%200651137644>* >>>> >>>> >>>> *-------------------------------------------------------------------------------------* >>>> >>> >>> >>> >>> -- >>> Bien cordialement, >>> >>> >>> *-------------------------------------------------------------------------------------* >>> *Brahim OUBRIK* >>> *Elève-Ingénieur en Informatique à Polytech Paris Sud.* >>> *Cell <a href="tel:%2B33%200651137644" value="<a >>> href="tel:%2B33651137644" value="+33651137644 <%2B33651137644>" >>> target="_blank">+33651137644 <%2B33651137644>" target="_blank"><a >>> href="tel:%2B33%200651137644" value="+33651137644 <%2B33651137644>" >>> target="_blank">+33 0651137644 <%2B33%200651137644>* >>> >>> >>> *-------------------------------------------------------------------------------------* >>> >> >> >> >> -- >> Bien cordialement, >> >> >> *-------------------------------------------------------------------------------------* >> *Brahim OUBRIK* >> *Elève-Ingénieur en Informatique à Polytech Paris Sud.* >> *Cell <a href="tel:%2B33%200651137644" value="+33651137644 >> <%2B33651137644>" target="_blank">+33 0651137644 <%2B33%200651137644>* >> >> >> *-------------------------------------------------------------------------------------* >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23696.html >> To unsubscribe from How to implement top() and filter() on object List >> for JavaRDD, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: How to implement top() and filter() on > object List for JavaRDD > <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23698.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >