The problem is you are referencing a class that does not "extend
serializable" in the data that you shuffle. Spark needs to send all
shuffle data over the network, so it needs to know how to serialize
them.

One option is to use Kryo for network serialization as described here
- you'll have to register all the class that get serialized though.

http://spark.incubator.apache.org/docs/latest/tuning.html

Another option is to write a wrapper class that "extends
externalizable" and write the serialization yourself.

- Patrick

On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> Hi All,
>
> My original RDD contains arrays of doubles. when appying a count() operator
> to the original RDD I get the result as expected.
> However when I run a map on the original RDD in order to generate a new RDD
> with only the first element of each array, and try to apply count() to the
> new generated RDD I get the following exception:
>
> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed to
> run count at AnalyticsEngine.java:133
> [error] (run-main) org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
> org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>
>
> If a run a take() operation on the new RDD I receive the results as
> expected. here is my code:
>
>
> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
> BSONObject>, Double>() {
>         @Override
>         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>           BSONObject doc = e._2();
>           List<List<Double>> vals = (List<List<Double>>)doc.get("data");
>           List<Double> results = new ArrayList<Double>();
>           for (int i=0; i< vals.size();i++ )
>               results.add((Double)vals.get(i).get(0));
>           return results;
>
>         }
>         });
>
>         logger.info("Take: {}", rdd2.take(100));
>         logger.info("Count: {}", rdd2.count());
>
>
> Any ideas on what I am doing wrong ?
>
> Thanks,
>
> Yadid
>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Reply via email to