Hmmm a spark streaming app code doesn't execute in the linear fashion
assumed in your previous code snippet - to achieve your objectives you
should do something like the following 

in terms of your second objective - saving the initialization and
serialization of the params you can:

a) broadcast them
b) have them as a Singleton (initialized from e.g. params in a file on HDFS)
on each Executor  

    messageBodies.foreachRDD(new Function<JavaRDD&lt;String>, Void>() {

        Param param = new Param();
        param.initialize();

      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
        ProcessPartitionFunction func = new ProcessPartitionFunction(param);
        rdd.foreachPartition(func);
        return null;
      }

    });

        //put this in e.g. the object destructor 
    param.deinitialize();

-----Original Message-----
From: dgoldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 1:56 PM
To: user@spark.apache.org
Subject: Objects serialized before foreachRDD/foreachPartition ?

I'm looking at https://spark.apache.org/docs/latest/tuning.html.  Basically
the takeaway is that all objects passed into the code processing RDD's must
be serializable. So if I've got a few objects that I'd rather initialize
once and deinitialize once outside of the logic processing the RDD's, I'd
need to think twice about the costs of serializing such objects, it would
seem.

In the below, does the Spark serialization happen before calling foreachRDD
or before calling foreachPartition?

    Param param = new Param();
    param.initialize();
    messageBodies.foreachRDD(new Function<JavaRDD&lt;String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
        ProcessPartitionFunction func = new ProcessPartitionFunction(param);
        rdd.foreachPartition(func);
        return null;
      }
    });
    param.deinitialize();

If param gets initialized to a significant memory footprint, are we better
off creating/initializing it before calling new ProcessPartitionFunction()
or perhaps in the 'call' method within that function?

I'm trying to avoid calling expensive init()/deinit() methods while
balancing against the serialization costs. Thanks.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor
e-foreachRDD-foreachPartition-tp23134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to