Re: serialization issue with mapPartitions

2014-12-26 Thread Akhil
You cannot pass your jobConf object inside any of the transformation function
in spark (like map, mapPartitions, etc.) since 
 org.apache.hadoop.mapreduce.Job is not Serializable. You can use
KryoSerializer (See this doc
http://spark.apache.org/docs/latest/tuning.html#data-serialization), We
usually converts the JobConf into ByteArray and pass over the byteArray
object inside the map and from there we creates the jobConf (new variable)
with the data inside byteArray object.


 I should rephrase my question as follows:
 
 How to use the corresponding Hadoop Configuration of a HadoopRDD in
 defining a function as an input parameter to the MapPartitions function?
 
 Thanks.
 
 Ey-Chih Chow





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: serialization issue with mapPartitions

2014-12-25 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 26, 2014 at 1:32 AM, ey-chih chow eyc...@hotmail.com wrote:

 I got some issues with mapPartitions with the following piece of code:

 val sessions = sc
   .newAPIHadoopFile(
 ... path to an avro file ...,
 classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]],
 classOf[AvroKey[ByteBuffer]],
 classOf[NullWritable],
 job.getConfiguration())
   .mapPartitions { valueIterator =
 val config = job.getConfiguration()
  .
   }
   .collect()

 Why job.getConfiguration() in the function mapPartitions will generate the
 following message?

 Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job


The functions inside mapPartitions() will be executed on the Spark
executors, not the Spark driver. Therefore, the function body needs to be
serialized and sent to the executors via network. If that is not possible
(in your case, `job` cannot be serialized), you will get a
NotSerializableException. It works inside newAPIHadoopFile because this is
executed on the driver.

Tobias


Re: serialization issue with mapPartitions

2014-12-25 Thread ey-chih chow
I should rephrase my question as follows:

How to use the corresponding Hadoop Configuration of a HadoopRDD in defining
a function as an input parameter to the MapPartitions function?

Thanks.

Ey-Chih Chow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: serialization issue with mapPartitions

2014-12-25 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.com wrote:

 I should rephrase my question as follows:

 How to use the corresponding Hadoop Configuration of a HadoopRDD in
 defining
 a function as an input parameter to the MapPartitions function?


Well, you could try to pull the `val config = job.getConfiguration()` out
of the function and just use `config` inside the function, hoping that this
one is serializable.

Tobias


RE: serialization issue with mapPartitions

2014-12-25 Thread Shao, Saisai
Hi,

Hadoop Configuration is only Writable, not Java Serializable. You can use 
SerializableWritable (in Spark) to wrap the Configuration to make it 
serializable, and use broadcast variable to broadcast this conf to all the 
node, then you can use it in mapPartitions, rather than  serialize it within 
closure.

You can refer to org.apache.spark.rdd.HadoopRDD, there is a similar usage 
scenario like yours.

Thanks
Jerry.

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Friday, December 26, 2014 9:38 AM
To: ey-chih chow
Cc: user
Subject: Re: serialization issue with mapPartitions

Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow 
eyc...@hotmail.commailto:eyc...@hotmail.com wrote:
I should rephrase my question as follows:

How to use the corresponding Hadoop Configuration of a HadoopRDD in defining
a function as an input parameter to the MapPartitions function?

Well, you could try to pull the `val config = job.getConfiguration()` out of 
the function and just use `config` inside the function, hoping that this one is 
serializable.

Tobias