Hi Patrick,

I am in fact using Kryo and im registering BSONObject.class (which is class holding the data) in my KryoRegistrator.
Im not sure what other classes I should be registering.

Thanks,

Yadid


On 11/3/13 7:23 PM, Patrick Wendell wrote:
The problem is you are referencing a class that does not "extend
serializable" in the data that you shuffle. Spark needs to send all
shuffle data over the network, so it needs to know how to serialize
them.

One option is to use Kryo for network serialization as described here
- you'll have to register all the class that get serialized though.

http://spark.incubator.apache.org/docs/latest/tuning.html

Another option is to write a wrapper class that "extends
externalizable" and write the serialization yourself.

- Patrick

On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
Hi All,

My original RDD contains arrays of doubles. when appying a count() operator
to the original RDD I get the result as expected.
However when I run a map on the original RDD in order to generate a new RDD
with only the first element of each array, and try to apply count() to the
new generated RDD I get the following exception:

19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed to
run count at AnalyticsEngine.java:133
[error] (run-main) org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)


If a run a take() operation on the new RDD I receive the results as
expected. here is my code:


JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
BSONObject>, Double>() {
         @Override
         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
           BSONObject doc = e._2();
           List<List<Double>> vals = (List<List<Double>>)doc.get("data");
           List<Double> results = new ArrayList<Double>();
           for (int i=0; i< vals.size();i++ )
               results.add((Double)vals.get(i).get(0));
           return results;

         }
         });

         logger.info("Take: {}", rdd2.take(100));
         logger.info("Count: {}", rdd2.count());


Any ideas on what I am doing wrong ?

Thanks,

Yadid



--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139





--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139



Reply via email to