Look at the exception. Basically, you'll need to register every class
type that is recursively used by BSONObject.

On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <> wrote:
> Hi Patrick,
> I am in fact using Kryo and im registering  BSONObject.class (which is class
> holding the data) in my KryoRegistrator.
> Im not sure what other classes I should be registering.
> Thanks,
> Yadid
> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>> The problem is you are referencing a class that does not "extend
>> serializable" in the data that you shuffle. Spark needs to send all
>> shuffle data over the network, so it needs to know how to serialize
>> them.
>> One option is to use Kryo for network serialization as described here
>> - you'll have to register all the class that get serialized though.
>> Another option is to write a wrapper class that "extends
>> externalizable" and write the serialization yourself.
>> - Patrick
>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <>
>> wrote:
>>> Hi All,
>>> My original RDD contains arrays of doubles. when appying a count()
>>> operator
>>> to the original RDD I get the result as expected.
>>> However when I run a map on the original RDD in order to generate a new
>>> RDD
>>> with only the first element of each array, and try to apply count() to
>>> the
>>> new generated RDD I get the following exception:
>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed
>>> to
>>> run count at
>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>> org.apache.spark.SparkException: Job failed:
>>>      at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>      at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>      at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>      at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>      at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>      at
>>>      at
>>>      at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>      at
>>>      at
>>> org.apache.spark.scheduler.DAGScheduler$$anon$
>>> If a run a take() operation on the new RDD I receive the results as
>>> expected. here is my code:
>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>> BSONObject>, Double>() {
>>>          @Override
>>>          public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>>            BSONObject doc = e._2();
>>>            List<List<Double>> vals = (List<List<Double>>)doc.get("data");
>>>            List<Double> results = new ArrayList<Double>();
>>>            for (int i=0; i< vals.size();i++ )
>>>                results.add((Double)vals.get(i).get(0));
>>>            return results;
>>>          }
>>>          });
>>>"Take: {}", rdd2.take(100));
>>>"Count: {}", rdd2.count());
>>> Any ideas on what I am doing wrong ?
>>> Thanks,
>>> Yadid
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139

Reply via email to