edu.mit.bsense.AnalyticsEngine Look at the exception. Basically, you'll need to register every class type that is recursively used by BSONObject.
On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote: > Hi Patrick, > > I am in fact using Kryo and im registering BSONObject.class (which is class > holding the data) in my KryoRegistrator. > Im not sure what other classes I should be registering. > > Thanks, > > Yadid > > > > On 11/3/13 7:23 PM, Patrick Wendell wrote: >> >> The problem is you are referencing a class that does not "extend >> serializable" in the data that you shuffle. Spark needs to send all >> shuffle data over the network, so it needs to know how to serialize >> them. >> >> One option is to use Kryo for network serialization as described here >> - you'll have to register all the class that get serialized though. >> >> http://spark.incubator.apache.org/docs/latest/tuning.html >> >> Another option is to write a wrapper class that "extends >> externalizable" and write the serialization yourself. >> >> - Patrick >> >> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu> >> wrote: >>> >>> Hi All, >>> >>> My original RDD contains arrays of doubles. when appying a count() >>> operator >>> to the original RDD I get the result as expected. >>> However when I run a map on the original RDD in order to generate a new >>> RDD >>> with only the first element of each array, and try to apply count() to >>> the >>> new generated RDD I get the following exception: >>> >>> 19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed >>> to >>> run count at AnalyticsEngine.java:133 >>> [error] (run-main) org.apache.spark.SparkException: Job failed: >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>> org.apache.spark.SparkException: Job failed: >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >>> at >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>> >>> >>> If a run a take() operation on the new RDD I receive the results as >>> expected. here is my code: >>> >>> >>> JavaRDD<Double> rdd2 = rdd.flatMap( new FlatMapFunction<Tuple2<Object, >>> BSONObject>, Double>() { >>> @Override >>> public Iterable<Double> call(Tuple2<Object, BSONObject> e) { >>> BSONObject doc = e._2(); >>> List<List<Double>> vals = (List<List<Double>>)doc.get("data"); >>> List<Double> results = new ArrayList<Double>(); >>> for (int i=0; i< vals.size();i++ ) >>> results.add((Double)vals.get(i).get(0)); >>> return results; >>> >>> } >>> }); >>> >>> logger.info("Take: {}", rdd2.take(100)); >>> logger.info("Count: {}", rdd2.count()); >>> >>> >>> Any ideas on what I am doing wrong ? >>> >>> Thanks, >>> >>> Yadid >>> >>> >>> >>> -- >>> Yadid Ayzenberg >>> Graduate Student and Research Assistant >>> Affective Computing >>> Phone: 617-866-7226 >>> Room: E14-274G >>> MIT Media Lab >>> 75 Amherst st, Cambridge, MA, 02139 >>> >>> >>> > > > -- > Yadid Ayzenberg > Graduate Student and Research Assistant > Affective Computing > Phone: 617-866-7226 > Room: E14-274G > MIT Media Lab > 75 Amherst st, Cambridge, MA, 02139 > > >