Thanks Sean. Please find attached my code. Let me know your suggestions/ideas.
Regards, *Sarath* On Wed, Sep 10, 2014 at 8:05 PM, Sean Owen <so...@cloudera.com> wrote: > You mention that you are creating a UserGroupInformation inside your > function, but something is still serializing it. You should show your > code since it may not be doing what you think. > > If you instantiate an object, it happens every time your function is > called. map() is called once per data element; mapPartitions() once > per partition. It depends. > > On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra > <sarathchandra.jos...@algofusiontech.com> wrote: > > Hi Sean, > > > > The solution of instantiating the non-serializable class inside the map > is > > working fine, but I hit a road block. The solution is not working for > > singleton classes like UserGroupInformation. > > > > In my logic as part of processing a HDFS file, I need to refer to some > > reference files which are again available in HDFS. So inside the map > method > > I'm trying to instantiate UserGroupInformation to get an instance of > > FileSystem. Then using this FileSystem instance I read those reference > files > > and use that data in my processing logic. > > > > This is throwing task not serializable exceptions for > 'UserGroupInformation' > > and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of > > 'UserGroupInformation'. But it didn't resolve the issue. > > > > Request you provide some pointers in this regard. > > > > Also I have a query - when we instantiate a class inside map method, > does it > > create a new instance for every RDD it is processing? > > > > Thanks & Regards, > > Sarath > > > > On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen <so...@cloudera.com> wrote: > >> > >> I disagree that the generally right change is to try to make the > >> classes serializable. Usually, classes that are not serializable are > >> not supposed to be serialized. You're using them in a way that's > >> causing them to be serialized, and that's probably not desired. > >> > >> For example, this is wrong: > >> > >> val foo: SomeUnserializableManagerClass = ... > >> rdd.map(d => foo.bar(d)) > >> > >> This is right: > >> > >> rdd.map { d => > >> val foo: SomeUnserializableManagerClass = ... > >> foo.bar(d) > >> } > >> > >> In the first instance, you create the object on the driver and try to > >> serialize and copy it to workers. In the second, you're creating > >> SomeUnserializableManagerClass in the function and therefore on the > >> worker. > >> > >> mapPartitions is better if this creation is expensive. > >> > >> On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra > >> <sarathchandra.jos...@algofusiontech.com> wrote: > >> > Hi, > >> > > >> > I'm trying to migrate a map-reduce program to work with spark. I > >> > migrated > >> > the program from Java to Scala. The map-reduce program basically > loads a > >> > HDFS file and for each line in the file it applies several > >> > transformation > >> > functions available in various external libraries. > >> > > >> > When I execute this over spark, it is throwing me "Task not > >> > serializable" > >> > exceptions for each and every class being used from these from > external > >> > libraries. I included serialization to few classes which are in my > >> > scope, > >> > but there there are several other classes which are out of my scope > like > >> > org.apache.hadoop.io.Text. > >> > > >> > How to overcome these exceptions? > >> > > >> > ~Sarath. > > > > >
class MapperNew extends Serializable { var hadoopConf: Configuration = _; var recordValidator: RecordValidator = _; var rulesLoader: RulesLoader = _; var recordTransformer: TransFilEngine = _; var ach: ConfigurationHadoop = _; var file: RDD[(Long, String)] = _; // Configuration is passed by caller def run(c: Configuration): Unit = { hadoopConf = c; val sparkConf = new SparkConf() .setMaster(hadoopConf.get("sparkMaster")) .setAppName("NewMapper") .setSparkHome(hadoopConf.get("sparkHome")) .setJars(Seq("rbcommon.jar", "rbengine.jar")); val sparkContext = new SparkContext(sparkConf); val util = SparkHadoopUtil.get; util.runAsSparkUser(() => { file = sparkContext.newAPIHadoopFile(hadoopConf.get("inputPath"), classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf).map(r => (r._1.get(), r._2.toString())); }); // Works fine till this line println("File line count = " + file.count); val rulesFilePath = "hdfs://slave:54310/user/hduser/" + // Without this spark is trying to read from local file system hadoopConf.get("rulesFilePath") + "/" + hadoopConf.get("rulesFile"); var processed = file.map(line => { // Doesn't work throws 'task not serializable' exception for UserGroupInformation val ugi = UserGroupInformation.createRemoteUser(hadoopConf.get("remoteUser")); // Doesn't work throws 'task not serializable' exception for FileSystem val fs = ugi.doAs(new PrivilegedExceptionAction[FileSystem] { def run(): FileSystem = { FileSystem.get(hadoopConf); } }); // RulesLoader and TransFilEngine are classes from external libraries rulesLoader = new RulesLoader(fs.open(new Path(rulesFilePath))); recordTransformer = TransFilEngine.getInstance(rulesLoader, Level.FINER); // Doesn't work this way too - Throws 'task not serializable' exception // util.runAsSparkUser(() => { // val fs = FileSystem.get(hadoopConf); // rulesLoader = new RulesLoader(fs.open(new Path(rulesFilePath))); // recordTransformer = TransFilEngine.getInstance(rulesLoader, Level.FINER); // }); process(line); }); println("Processed count = " + processed.count); } def process(line: (Long, String)): String = { // More processing to do using other third party libraries line._2; } }
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org