Re: serialization issue with mapPartitions
You cannot pass your jobConf object inside any of the transformation function in spark (like map, mapPartitions, etc.) since org.apache.hadoop.mapreduce.Job is not Serializable. You can use KryoSerializer (See this doc http://spark.apache.org/docs/latest/tuning.html#data-serialization), We usually converts the JobConf into ByteArray and pass over the byteArray object inside the map and from there we creates the jobConf (new variable) with the data inside byteArray object. I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20865.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: serialization issue with mapPartitions
Hi, On Fri, Dec 26, 2014 at 1:32 AM, ey-chih chow eyc...@hotmail.com wrote: I got some issues with mapPartitions with the following piece of code: val sessions = sc .newAPIHadoopFile( ... path to an avro file ..., classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]], classOf[AvroKey[ByteBuffer]], classOf[NullWritable], job.getConfiguration()) .mapPartitions { valueIterator = val config = job.getConfiguration() . } .collect() Why job.getConfiguration() in the function mapPartitions will generate the following message? Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job The functions inside mapPartitions() will be executed on the Spark executors, not the Spark driver. Therefore, the function body needs to be serialized and sent to the executors via network. If that is not possible (in your case, `job` cannot be serialized), you will get a NotSerializableException. It works inside newAPIHadoopFile because this is executed on the driver. Tobias
Re: serialization issue with mapPartitions
I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20861.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: serialization issue with mapPartitions
Hi, On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.com wrote: I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Well, you could try to pull the `val config = job.getConfiguration()` out of the function and just use `config` inside the function, hoping that this one is serializable. Tobias
RE: serialization issue with mapPartitions
Hi, Hadoop Configuration is only Writable, not Java Serializable. You can use SerializableWritable (in Spark) to wrap the Configuration to make it serializable, and use broadcast variable to broadcast this conf to all the node, then you can use it in mapPartitions, rather than serialize it within closure. You can refer to org.apache.spark.rdd.HadoopRDD, there is a similar usage scenario like yours. Thanks Jerry. From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Friday, December 26, 2014 9:38 AM To: ey-chih chow Cc: user Subject: Re: serialization issue with mapPartitions Hi, On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.commailto:eyc...@hotmail.com wrote: I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Well, you could try to pull the `val config = job.getConfiguration()` out of the function and just use `config` inside the function, hoping that this one is serializable. Tobias