edu.mit.bsense.AnalyticsEngine

Look at the exception. Basically, you'll need to register every class
type that is recursively used by BSONObject.

On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> Hi Patrick,
>
> I am in fact using Kryo and im registering  BSONObject.class (which is class
> holding the data) in my KryoRegistrator.
> Im not sure what other classes I should be registering.
>
> Thanks,
>
> Yadid
>
>
>
> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>
>> The problem is you are referencing a class that does not "extend
>> serializable" in the data that you shuffle. Spark needs to send all
>> shuffle data over the network, so it needs to know how to serialize
>> them.
>>
>> One option is to use Kryo for network serialization as described here
>> - you'll have to register all the class that get serialized though.
>>
>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>
>> Another option is to write a wrapper class that "extends
>> externalizable" and write the serialization yourself.
>>
>> - Patrick
>>
>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
>> wrote:
>>>
>>> Hi All,
>>>
>>> My original RDD contains arrays of doubles. when appying a count()
>>> operator
>>> to the original RDD I get the result as expected.
>>> However when I run a map on the original RDD in order to generate a new
>>> RDD
>>> with only the first element of each array, and try to apply count() to
>>> the
>>> new generated RDD I get the following exception:
>>>
>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed
>>> to
>>> run count at AnalyticsEngine.java:133
>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>> org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>      at
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>      at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>
>>>
>>> If a run a take() operation on the new RDD I receive the results as
>>> expected. here is my code:
>>>
>>>
>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>> BSONObject>, Double>() {
>>>          @Override
>>>          public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>>            BSONObject doc = e._2();
>>>            List<List<Double>> vals = (List<List<Double>>)doc.get("data");
>>>            List<Double> results = new ArrayList<Double>();
>>>            for (int i=0; i< vals.size();i++ )
>>>                results.add((Double)vals.get(i).get(0));
>>>            return results;
>>>
>>>          }
>>>          });
>>>
>>>          logger.info("Take: {}", rdd2.take(100));
>>>          logger.info("Count: {}", rdd2.count());
>>>
>>>
>>> Any ideas on what I am doing wrong ?
>>>
>>> Thanks,
>>>
>>> Yadid
>>>
>>>
>>>
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
>>>
>>>
>>>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Reply via email to