code is below. in the code rdd.count() works, but rdd2.count() fails.

public class AnalyticsEngine  implements Serializable {

    private static AnalyticsEngine engine=null;
    private JavaSparkContext sc;

    final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class);
    private Properties prop;

    String db_host;

    private AnalyticsEngine()
    {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); System.setProperty("spark.kryo.registrator", "edu.mit.bsense.MyRegistrator");
        sc = new JavaSparkContext("local[4]","TestSpark");
        Properties prop = new Properties();
        try {
            prop.load(new FileInputStream("config.properties"));


            db_host = prop.getProperty("database_host1");
            logger.info("Database host: {}", db_host);
        }  catch (FileNotFoundException ex)
                {
logger.info("Could not read config.properties: " + ex.toString());

                } catch (IOException ex)
                {
logger.info("Could not read config.properties: " + ex.toString());

                }



        public void getData(void)
        {
        Configuration conf = new Configuration();

String conf_url = "mongodb://" + db_host + "/test.data1"; //this is the data partition
        conf.set("mongo.input.uri", conf_url);


conf.set("mongo.input.query", "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
        conf.set("mongo.input.split_size","64");

JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);

        rdd.cache();

        logger.info("Count of rdd: {}", rdd.count());

logger.info("==========================================================================");



JavaDoubleRDD rdd2 = rdd.flatMap( new DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
        @Override
        public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
          BSONObject doc = e._2();
          BasicDBList vals = (BasicDBList)doc.get("data");
          List<Double> results = new ArrayList<Double>();
          for (int i=0; i< vals.size();i++ )
results.add((Double)((BasicDBList)vals.get(i)).get(0));
          return results;

        }
        });

        logger.info("Take: {}", rdd2.take(100));
        logger.info("Count: {}", rdd2.count());


    }

    }


On 11/3/13 8:19 PM, Patrick Wendell wrote:
Thanks that would help. This would be consistent with there being a
reference to the SparkContext itself inside of the closure. Just want
to make sure that's not the case.

On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
Im running in local[4] mode - so there are no slave machines. Full stack
trace:


(run-main) org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
[debug]     Thread run-main exited.
[debug] Interrupting remaining threads (should be all daemons).
[debug] Sandboxed run complete..
java.lang.RuntimeException: Nonzero exit code: 1
     at scala.sys.package$.error(package.scala:27)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at scala.Option.foreach(Option.scala:236)
     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
     at sbt.Defaults$.toError(Defaults.scala:34)
     at
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
     at
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
     at sbt.std.Transform$$anon$4.work(System.scala:64)
     at
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
     at sbt.Execute.work(Execute.scala:244)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
     at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
     at java.lang.Thread.run(Thread.java:695)

when I add implements Serializable to my class, I get the following stack
trace:

error] (run-main) org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext

     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
[debug]     Thread run-main exited.
[debug] Interrupting remaining threads (should be all daemons).
[debug] Sandboxed run complete..
java.lang.RuntimeException: Nonzero exit code: 1
     at scala.sys.package$.error(package.scala:27)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at scala.Option.foreach(Option.scala:236)
     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
     at sbt.Defaults$.toError(Defaults.scala:34)
     at
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
     at
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
     at sbt.std.Transform$$anon$4.work(System.scala:64)
     at
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
     at sbt.Execute.work(Execute.scala:244)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
     at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
     at java.lang.Thread.run(Thread.java:695)

I can post my code if that helps



On 11/3/13 8:05 PM, Patrick Wendell wrote:
If you look in the UI, are there failures on any of the slaves that
you can give a  stack trace for? That would narrow down where the
serialization error is happening.

Unfortunately this code path doesn't print a full stack trace which
makes it harder to debug where the serialization error comes from.

Could you post all of your code?

Also, just wondering, what happens if you just go ahead and add
"extends Serializable" to AnalyticsEngine class? It's possible this is
happening during closure serialization, which will use the closure
serializer (which is by default Java).

- Patrick

On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu>
wrote:
yes, I tried that as well (it is currently registered with Kryo)-
although
it doesnt make sense to me (and doesnt solve the problem). I also made
sure
my registration was running:
DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
registrator: edu.mit.bsense.MyRegistrator
7841 [spark-akka.actor.default-dispatcher-3] DEBUG
org.apache.spark.serializer.KryoSerializer  - Running user registrator:
edu.mit.bsense.MyRegistrator

edu.mit.bsense.AnalyticsEngine is the class containing the SC which
instantiates the RDDs and runs the map() and count().
Can you explain why it needs to be serialized?

Also, when running count() on my original RDD (pre map) I get the right
answer - this means the classes of data in the RDD are serializable.
It's only when I run map, and then count() on a new RDD do I get this
exception. My map does not introduce any new classes it - just iterates
over
the existing data.

Any ideas?









On 11/3/13 7:43 PM, Patrick Wendell wrote:
edu.mit.bsense.AnalyticsEngine

Look at the exception. Basically, you'll need to register every class
type that is recursively used by BSONObject.

On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu>
wrote:
Hi Patrick,

I am in fact using Kryo and im registering  BSONObject.class (which is
class
holding the data) in my KryoRegistrator.
Im not sure what other classes I should be registering.

Thanks,

Yadid



On 11/3/13 7:23 PM, Patrick Wendell wrote:
The problem is you are referencing a class that does not "extend
serializable" in the data that you shuffle. Spark needs to send all
shuffle data over the network, so it needs to know how to serialize
them.

One option is to use Kryo for network serialization as described here
- you'll have to register all the class that get serialized though.

http://spark.incubator.apache.org/docs/latest/tuning.html

Another option is to write a wrapper class that "extends
externalizable" and write the serialization yourself.

- Patrick

On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
wrote:
Hi All,

My original RDD contains arrays of doubles. when appying a count()
operator
to the original RDD I get the result as expected.
However when I run a map on the original RDD in order to generate a
new
RDD
with only the first element of each array, and try to apply count()
to
the
new generated RDD I get the following exception:

19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
Failed
to
run count at AnalyticsEngine.java:133
[error] (run-main) org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
        at



org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
        at



org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
        at



scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at



org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
        at



org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
        at



org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
        at



org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
        at



org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at



org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)


If a run a take() operation on the new RDD I receive the results as
expected. here is my code:


JavaRDD<Double> rdd2 =  rdd.flatMap( new
FlatMapFunction<Tuple2<Object,
BSONObject>, Double>() {
            @Override
            public Iterable<Double> call(Tuple2<Object, BSONObject> e)
{
              BSONObject doc = e._2();
              List<List<Double>> vals =
(List<List<Double>>)doc.get("data");
              List<Double> results = new ArrayList<Double>();
              for (int i=0; i< vals.size();i++ )
                  results.add((Double)vals.get(i).get(0));
              return results;

            }
            });

            logger.info("Take: {}", rdd2.take(100));
            logger.info("Count: {}", rdd2.count());


Any ideas on what I am doing wrong ?

Thanks,

Yadid



--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139



--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139



--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139





--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139



Reply via email to