Hi Shengshan, In first code, ‘newAPIJobConfiguration’ is sharing across all rdds. So, it should be serializable.
In second code, each rdd creates a new ‘mytest_config’ object and an individual ‘newAPIJobConfiguration’ instead of sharing the same object. So it can be non-serializable. If it’s possible, maybe you can try to save the result of mydata.foreachRDD(…) instead of each rdd like val result = mydata.foreachRDD( rdd => { val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty) .map(_.get).map(Scan.convertForHbase _ ) result.write.save(...) On Tue, Oct 17, 2017 at 7:00 PM, Shengshan Zhang <zsslv...@gmail.com> wrote: > Hello guys! > java.io.NotSerializableException troubles me a lot when i process data > with spark. > ``` > val hbase_conf = HBaseConfiguration.create() > hbase_conf.set("hbase.zookeeper.property.clientPort", "2181") > hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com, > hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com") > val newAPIJobConfiguration = Job.getInstance(hbase_conf); > newAPIJobConfiguration.getConfiguration().set( > TableOutputFormat.OUTPUT_TABLE, "mytest_table"); > newAPIJobConfiguration.setOutputFormatClass(classOf[ > org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ > ImmutableBytesWritable]]) > > newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", > "/tmp") > // TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码 > mydata.foreachRDD( rdd => { > val json_rdd = rdd.map(Json.parse _ > ).map(_.validate[Scan]).map(Scan.transformScanRestult > _).filter(_.nonEmpty) > .map(_.get).map(Scan.convertForHbase _ ) > json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration. > getConfiguration) > }) > ``` > > However it fails cause of *java.io > <http://java.io>.NotSerializableException *and follow is error info > 17/10/16 18:56:50 ERROR Utils: Exception encountered > java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1509) > at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > > > *So i change my code as follows * > > object mytest_config{ > val hbase_conf = HBaseConfiguration.create() > hbase_conf.set("hbase.zookeeper.property.clientPort", "2181") > hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2") > val newAPIJobConfiguration = Job.getInstance(hbase_conf); > > newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > "mytest_table"); > > newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]]) > > newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", > "/tmp") > } > mydata.foreachRDD( rdd => { > val json_rdd = rdd.map(Json.parse _ > ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty) > .map(_.get).map(Scan.convertForHbase _ ) > > json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration) > }) > > And this could work! > Somebody got any ideas why this work , and what is the recommended way > officially? > > > > 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>> > <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7> > > > 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>> > <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7> > > > 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>> > <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7> > -- Gao JiaXiang Data Analyst, GCBI <http://www.gcbi.com.cn>