No problem - thanks for helping us diagnose this!
On Tue, Nov 5, 2013 at 5:04 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote: > Ah, I see. Thanks very much for you assistance Patrick and Reynold. > As a workaround for now, I implemented the SC field as transient and its > working fine. > > Yadid > > > > On 11/3/13 9:05 PM, Reynold Xin wrote: > > Yea so every inner class actually contains a field referencing the outer > class. In your case, the anonymous class DoubleFlatMapFunction actually has > a this$0 field referencing the outer class AnalyticsEngine, and thus why > Spark is trying to serialize AnalyticsEngine. > > In the Scala API, the closure (which is really just implemented as anonymous > classes) has a field called "$outer", and Spark uses a "closure cleaner" > that goes into the anonymous class to remove the $outer field if it is not > used in the closure itself. In Java, the compiler generates a field called > "this$0", and thus the closure cleaner doesn't find it and can't "clean" it > properly. > > I will work on a fix for the closure cleaner to clean this up as well. > Meantime, you can work around this by either defining your anonymous class > as a static class, or mark the JavaSparkContext field as transient. > > > > On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell <pwend...@gmail.com> wrote: >> >> Hm, I think you are triggering a bug in the Java API where closures >> may not be properly cleaned. I think @rxin has reproduced this, >> deferring to him. >> >> - Patrick >> >> On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <ya...@media.mit.edu> >> wrote: >> > code is below. in the code rdd.count() works, but rdd2.count() fails. >> > >> > public class AnalyticsEngine implements Serializable { >> > >> > private static AnalyticsEngine engine=null; >> > private JavaSparkContext sc; >> > >> > final Logger logger = >> > LoggerFactory.getLogger(AnalyticsEngine.class); >> > private Properties prop; >> > >> > String db_host; >> > >> > private AnalyticsEngine() >> > { >> > System.setProperty("spark.serializer", >> > "org.apache.spark.serializer.KryoSerializer"); >> > System.setProperty("spark.kryo.registrator", >> > "edu.mit.bsense.MyRegistrator"); >> > sc = new JavaSparkContext("local[4]","TestSpark"); >> > Properties prop = new Properties(); >> > try { >> > prop.load(new FileInputStream("config.properties")); >> > >> > >> > db_host = prop.getProperty("database_host1"); >> > logger.info("Database host: {}", db_host); >> > } catch (FileNotFoundException ex) >> > { >> > logger.info("Could not read config.properties: " + >> > ex.toString()); >> > >> > } catch (IOException ex) >> > { >> > logger.info("Could not read config.properties: " + >> > ex.toString()); >> > >> > } >> > >> > >> > >> > public void getData(void) >> > { >> > Configuration conf = new Configuration(); >> > >> > String conf_url = "mongodb://" + db_host + "/test.data1"; //this >> > is >> > the data partition >> > conf.set("mongo.input.uri", conf_url); >> > >> > >> > conf.set("mongo.input.query", >> > "{\"streamId\":\""+"13"+"\"},{\"data\":1}"); >> > conf.set("mongo.input.split_size","64"); >> > >> > JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf, >> > MongoInputFormat.class, Object.class, BSONObject.class); >> > >> > rdd.cache(); >> > >> > logger.info("Count of rdd: {}", rdd.count()); >> > >> > >> > logger.info("=========================================================================="); >> > >> > >> > >> > JavaDoubleRDD rdd2 = rdd.flatMap( new >> > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() { >> > >> > @Override >> > public Iterable<Double> call(Tuple2<Object, BSONObject> e) { >> > BSONObject doc = e._2(); >> > BasicDBList vals = (BasicDBList)doc.get("data"); >> > >> > List<Double> results = new ArrayList<Double>(); >> > for (int i=0; i< vals.size();i++ ) >> > results.add((Double)((BasicDBList)vals.get(i)).get(0)); >> > >> > return results; >> > >> > } >> > }); >> > >> > logger.info("Take: {}", rdd2.take(100)); >> > logger.info("Count: {}", rdd2.count()); >> > >> > >> > } >> > >> > } >> > >> > >> > On 11/3/13 8:19 PM, Patrick Wendell wrote: >> >> >> >> Thanks that would help. This would be consistent with there being a >> >> reference to the SparkContext itself inside of the closure. Just want >> >> to make sure that's not the case. >> >> >> >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu> >> >> wrote: >> >>> >> >>> Im running in local[4] mode - so there are no slave machines. Full >> >>> stack >> >>> trace: >> >>> >> >>> >> >>> (run-main) org.apache.spark.SparkException: Job failed: >> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >> >>> org.apache.spark.SparkException: Job failed: >> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >> >>> at >> >>> >> >>> >> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >> >>> at >> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >> >>> [debug] Thread run-main exited. >> >>> [debug] Interrupting remaining threads (should be all daemons). >> >>> [debug] Sandboxed run complete.. >> >>> java.lang.RuntimeException: Nonzero exit code: 1 >> >>> at scala.sys.package$.error(package.scala:27) >> >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >> >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >> >>> at scala.Option.foreach(Option.scala:236) >> >>> at sbt.BuildCommon$class.toError(Defaults.scala:1628) >> >>> at sbt.Defaults$.toError(Defaults.scala:34) >> >>> at >> >>> >> >>> >> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) >> >>> at >> >>> >> >>> >> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) >> >>> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) >> >>> at >> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) >> >>> at sbt.std.Transform$$anon$4.work(System.scala:64) >> >>> at >> >>> >> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >> >>> at >> >>> >> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >> >>> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) >> >>> at sbt.Execute.work(Execute.scala:244) >> >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >> >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >> >>> at >> >>> >> >>> >> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) >> >>> at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) >> >>> at >> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> >>> at >> >>> >> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) >> >>> at >> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> >>> at >> >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >> >>> at >> >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >> >>> at java.lang.Thread.run(Thread.java:695) >> >>> >> >>> when I add implements Serializable to my class, I get the following >> >>> stack >> >>> trace: >> >>> >> >>> error] (run-main) org.apache.spark.SparkException: Job failed: >> >>> java.io.NotSerializableException: >> >>> org.apache.spark.api.java.JavaSparkContext >> >>> org.apache.spark.SparkException: Job failed: >> >>> java.io.NotSerializableException: >> >>> org.apache.spark.api.java.JavaSparkContext >> >>> >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >> >>> at >> >>> >> >>> >> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >> >>> at >> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >> >>> at >> >>> >> >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >> >>> [debug] Thread run-main exited. >> >>> [debug] Interrupting remaining threads (should be all daemons). >> >>> [debug] Sandboxed run complete.. >> >>> java.lang.RuntimeException: Nonzero exit code: 1 >> >>> at scala.sys.package$.error(package.scala:27) >> >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >> >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >> >>> at scala.Option.foreach(Option.scala:236) >> >>> at sbt.BuildCommon$class.toError(Defaults.scala:1628) >> >>> at sbt.Defaults$.toError(Defaults.scala:34) >> >>> at >> >>> >> >>> >> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) >> >>> at >> >>> >> >>> >> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) >> >>> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) >> >>> at >> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) >> >>> at sbt.std.Transform$$anon$4.work(System.scala:64) >> >>> at >> >>> >> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >> >>> at >> >>> >> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >> >>> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) >> >>> at sbt.Execute.work(Execute.scala:244) >> >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >> >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >> >>> at >> >>> >> >>> >> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) >> >>> at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) >> >>> at >> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> >>> at >> >>> >> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) >> >>> at >> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> >>> at >> >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >> >>> at >> >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >> >>> at java.lang.Thread.run(Thread.java:695) >> >>> >> >>> I can post my code if that helps >> >>> >> >>> >> >>> >> >>> On 11/3/13 8:05 PM, Patrick Wendell wrote: >> >>>> >> >>>> If you look in the UI, are there failures on any of the slaves that >> >>>> you can give a stack trace for? That would narrow down where the >> >>>> serialization error is happening. >> >>>> >> >>>> Unfortunately this code path doesn't print a full stack trace which >> >>>> makes it harder to debug where the serialization error comes from. >> >>>> >> >>>> Could you post all of your code? >> >>>> >> >>>> Also, just wondering, what happens if you just go ahead and add >> >>>> "extends Serializable" to AnalyticsEngine class? It's possible this >> >>>> is >> >>>> happening during closure serialization, which will use the closure >> >>>> serializer (which is by default Java). >> >>>> >> >>>> - Patrick >> >>>> >> >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu> >> >>>> wrote: >> >>>>> >> >>>>> yes, I tried that as well (it is currently registered with Kryo)- >> >>>>> although >> >>>>> it doesnt make sense to me (and doesnt solve the problem). I also >> >>>>> made >> >>>>> sure >> >>>>> my registration was running: >> >>>>> DEBUG org.apache.spark.serializer.KryoSerializer - Running user >> >>>>> registrator: edu.mit.bsense.MyRegistrator >> >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG >> >>>>> org.apache.spark.serializer.KryoSerializer - Running user >> >>>>> registrator: >> >>>>> edu.mit.bsense.MyRegistrator >> >>>>> >> >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which >> >>>>> instantiates the RDDs and runs the map() and count(). >> >>>>> Can you explain why it needs to be serialized? >> >>>>> >> >>>>> Also, when running count() on my original RDD (pre map) I get the >> >>>>> right >> >>>>> answer - this means the classes of data in the RDD are serializable. >> >>>>> It's only when I run map, and then count() on a new RDD do I get >> >>>>> this >> >>>>> exception. My map does not introduce any new classes it - just >> >>>>> iterates >> >>>>> over >> >>>>> the existing data. >> >>>>> >> >>>>> Any ideas? >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote: >> >>>>>> >> >>>>>> edu.mit.bsense.AnalyticsEngine >> >>>>>> >> >>>>>> Look at the exception. Basically, you'll need to register every >> >>>>>> class >> >>>>>> type that is recursively used by BSONObject. >> >>>>>> >> >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg >> >>>>>> <ya...@media.mit.edu> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>> Hi Patrick, >> >>>>>>> >> >>>>>>> I am in fact using Kryo and im registering BSONObject.class >> >>>>>>> (which >> >>>>>>> is >> >>>>>>> class >> >>>>>>> holding the data) in my KryoRegistrator. >> >>>>>>> Im not sure what other classes I should be registering. >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> >> >>>>>>> Yadid >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote: >> >>>>>>>> >> >>>>>>>> The problem is you are referencing a class that does not "extend >> >>>>>>>> serializable" in the data that you shuffle. Spark needs to send >> >>>>>>>> all >> >>>>>>>> shuffle data over the network, so it needs to know how to >> >>>>>>>> serialize >> >>>>>>>> them. >> >>>>>>>> >> >>>>>>>> One option is to use Kryo for network serialization as described >> >>>>>>>> here >> >>>>>>>> - you'll have to register all the class that get serialized >> >>>>>>>> though. >> >>>>>>>> >> >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html >> >>>>>>>> >> >>>>>>>> Another option is to write a wrapper class that "extends >> >>>>>>>> externalizable" and write the serialization yourself. >> >>>>>>>> >> >>>>>>>> - Patrick >> >>>>>>>> >> >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg >> >>>>>>>> <ya...@media.mit.edu> >> >>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi All, >> >>>>>>>>> >> >>>>>>>>> My original RDD contains arrays of doubles. when appying a >> >>>>>>>>> count() >> >>>>>>>>> operator >> >>>>>>>>> to the original RDD I get the result as expected. >> >>>>>>>>> However when I run a map on the original RDD in order to >> >>>>>>>>> generate a >> >>>>>>>>> new >> >>>>>>>>> RDD >> >>>>>>>>> with only the first element of each array, and try to apply >> >>>>>>>>> count() >> >>>>>>>>> to >> >>>>>>>>> the >> >>>>>>>>> new generated RDD I get the following exception: >> >>>>>>>>> >> >>>>>>>>> 19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler >> >>>>>>>>> - >> >>>>>>>>> Failed >> >>>>>>>>> to >> >>>>>>>>> run count at AnalyticsEngine.java:133 >> >>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed: >> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >> >>>>>>>>> org.apache.spark.SparkException: Job failed: >> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >> >>>>>>>>> at >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> If a run a take() operation on the new RDD I receive the results >> >>>>>>>>> as >> >>>>>>>>> expected. here is my code: >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> JavaRDD<Double> rdd2 = rdd.flatMap( new >> >>>>>>>>> FlatMapFunction<Tuple2<Object, >> >>>>>>>>> BSONObject>, Double>() { >> >>>>>>>>> @Override >> >>>>>>>>> public Iterable<Double> call(Tuple2<Object, >> >>>>>>>>> BSONObject> >> >>>>>>>>> e) >> >>>>>>>>> { >> >>>>>>>>> BSONObject doc = e._2(); >> >>>>>>>>> List<List<Double>> vals = >> >>>>>>>>> (List<List<Double>>)doc.get("data"); >> >>>>>>>>> List<Double> results = new ArrayList<Double>(); >> >>>>>>>>> for (int i=0; i< vals.size();i++ ) >> >>>>>>>>> results.add((Double)vals.get(i).get(0)); >> >>>>>>>>> return results; >> >>>>>>>>> >> >>>>>>>>> } >> >>>>>>>>> }); >> >>>>>>>>> >> >>>>>>>>> logger.info("Take: {}", rdd2.take(100)); >> >>>>>>>>> logger.info("Count: {}", rdd2.count()); >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Any ideas on what I am doing wrong ? >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> >> >>>>>>>>> Yadid >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> -- >> >>>>>>>>> Yadid Ayzenberg >> >>>>>>>>> Graduate Student and Research Assistant >> >>>>>>>>> Affective Computing >> >>>>>>>>> Phone: 617-866-7226 >> >>>>>>>>> Room: E14-274G >> >>>>>>>>> MIT Media Lab >> >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139 >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>> -- >> >>>>>>> Yadid Ayzenberg >> >>>>>>> Graduate Student and Research Assistant >> >>>>>>> Affective Computing >> >>>>>>> Phone: 617-866-7226 >> >>>>>>> Room: E14-274G >> >>>>>>> MIT Media Lab >> >>>>>>> 75 Amherst st, Cambridge, MA, 02139 >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>> -- >> >>>>> Yadid Ayzenberg >> >>>>> Graduate Student and Research Assistant >> >>>>> Affective Computing >> >>>>> Phone: 617-866-7226 >> >>>>> Room: E14-274G >> >>>>> MIT Media Lab >> >>>>> 75 Amherst st, Cambridge, MA, 02139 >> >>>>> >> >>>>> >> >>>>> >> >>> >> >>> -- >> >>> Yadid Ayzenberg >> >>> Graduate Student and Research Assistant >> >>> Affective Computing >> >>> Phone: 617-866-7226 >> >>> Room: E14-274G >> >>> MIT Media Lab >> >>> 75 Amherst st, Cambridge, MA, 02139 >> >>> >> >>> >> >>> >> > >> > >> > -- >> > Yadid Ayzenberg >> > Graduate Student and Research Assistant >> > Affective Computing >> > Phone: 617-866-7226 >> > Room: E14-274G >> > MIT Media Lab >> > 75 Amherst st, Cambridge, MA, 02139 >> > >> > >> > > > > > > -- > Yadid Ayzenberg > Graduate Student and Research Assistant > Affective Computing > Phone: 617-866-7226 > Room: E14-274G > MIT Media Lab > 75 Amherst st, Cambridge, MA, 02139 > > >