The problem is you are referencing a class that does not "extend serializable" in the data that you shuffle. Spark needs to send all shuffle data over the network, so it needs to know how to serialize them.
One option is to use Kryo for network serialization as described here - you'll have to register all the class that get serialized though. http://spark.incubator.apache.org/docs/latest/tuning.html Another option is to write a wrapper class that "extends externalizable" and write the serialization yourself. - Patrick On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote: > Hi All, > > My original RDD contains arrays of doubles. when appying a count() operator > to the original RDD I get the result as expected. > However when I run a map on the original RDD in order to generate a new RDD > with only the first element of each array, and try to apply count() to the > new generated RDD I get the following exception: > > 19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed to > run count at AnalyticsEngine.java:133 > [error] (run-main) org.apache.spark.SparkException: Job failed: > java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > org.apache.spark.SparkException: Job failed: > java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > > > If a run a take() operation on the new RDD I receive the results as > expected. here is my code: > > > JavaRDD<Double> rdd2 = rdd.flatMap( new FlatMapFunction<Tuple2<Object, > BSONObject>, Double>() { > @Override > public Iterable<Double> call(Tuple2<Object, BSONObject> e) { > BSONObject doc = e._2(); > List<List<Double>> vals = (List<List<Double>>)doc.get("data"); > List<Double> results = new ArrayList<Double>(); > for (int i=0; i< vals.size();i++ ) > results.add((Double)vals.get(i).get(0)); > return results; > > } > }); > > logger.info("Take: {}", rdd2.take(100)); > logger.info("Count: {}", rdd2.count()); > > > Any ideas on what I am doing wrong ? > > Thanks, > > Yadid > > > > -- > Yadid Ayzenberg > Graduate Student and Research Assistant > Affective Computing > Phone: 617-866-7226 > Room: E14-274G > MIT Media Lab > 75 Amherst st, Cambridge, MA, 02139 > > >