Thanks Sean.
Please find attached my code. Let me know your suggestions/ideas.

Regards,

*Sarath*

On Wed, Sep 10, 2014 at 8:05 PM, Sean Owen <so...@cloudera.com> wrote:

> You mention that you are creating a UserGroupInformation inside your
> function, but something is still serializing it. You should show your
> code since it may not be doing what you think.
>
> If you instantiate an object, it happens every time your function is
> called. map() is called once per data element; mapPartitions() once
> per partition. It depends.
>
> On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra
> <sarathchandra.jos...@algofusiontech.com> wrote:
> > Hi Sean,
> >
> > The solution of instantiating the non-serializable class inside the map
> is
> > working fine, but I hit a road block. The solution is not working for
> > singleton classes like UserGroupInformation.
> >
> > In my logic as part of processing a HDFS file, I need to refer to some
> > reference files which are again available in HDFS. So inside the map
> method
> > I'm trying to instantiate UserGroupInformation to get an instance of
> > FileSystem. Then using this FileSystem instance I read those reference
> files
> > and use that data in my processing logic.
> >
> > This is throwing task not serializable exceptions for
> 'UserGroupInformation'
> > and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of
> > 'UserGroupInformation'. But it didn't resolve the issue.
> >
> > Request you provide some pointers in this regard.
> >
> > Also I have a query - when we instantiate a class inside map method,
> does it
> > create a new instance for every RDD it is processing?
> >
> > Thanks & Regards,
> > Sarath
> >
> > On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> I disagree that the generally right change is to try to make the
> >> classes serializable. Usually, classes that are not serializable are
> >> not supposed to be serialized. You're using them in a way that's
> >> causing them to be serialized, and that's probably not desired.
> >>
> >> For example, this is wrong:
> >>
> >> val foo: SomeUnserializableManagerClass = ...
> >> rdd.map(d => foo.bar(d))
> >>
> >> This is right:
> >>
> >> rdd.map { d =>
> >>   val foo: SomeUnserializableManagerClass = ...
> >>   foo.bar(d)
> >> }
> >>
> >> In the first instance, you create the object on the driver and try to
> >> serialize and copy it to workers. In the second, you're creating
> >> SomeUnserializableManagerClass in the function and therefore on the
> >> worker.
> >>
> >> mapPartitions is better if this creation is expensive.
> >>
> >> On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra
> >> <sarathchandra.jos...@algofusiontech.com> wrote:
> >> > Hi,
> >> >
> >> > I'm trying to migrate a map-reduce program to work with spark. I
> >> > migrated
> >> > the program from Java to Scala. The map-reduce program basically
> loads a
> >> > HDFS file and for each line in the file it applies several
> >> > transformation
> >> > functions available in various external libraries.
> >> >
> >> > When I execute this over spark, it is throwing me "Task not
> >> > serializable"
> >> > exceptions for each and every class being used from these from
> external
> >> > libraries. I included serialization to few classes which are in my
> >> > scope,
> >> > but there there are several other classes which are out of my scope
> like
> >> > org.apache.hadoop.io.Text.
> >> >
> >> > How to overcome these exceptions?
> >> >
> >> > ~Sarath.
> >
> >
>
class MapperNew extends Serializable {

  var hadoopConf: Configuration = _;
  var recordValidator: RecordValidator = _;
  var rulesLoader: RulesLoader = _;
  var recordTransformer: TransFilEngine = _;
  var ach: ConfigurationHadoop = _;
  var file: RDD[(Long, String)] = _;

  // Configuration is passed by caller
  def run(c: Configuration): Unit = {
    
    hadoopConf = c;
    
    val sparkConf = new SparkConf()
      .setMaster(hadoopConf.get("sparkMaster"))
      .setAppName("NewMapper")
      .setSparkHome(hadoopConf.get("sparkHome"))
      .setJars(Seq("rbcommon.jar",
        "rbengine.jar"));
    val sparkContext = new SparkContext(sparkConf);
    val util = SparkHadoopUtil.get;

    util.runAsSparkUser(() => {
      file = sparkContext.newAPIHadoopFile(hadoopConf.get("inputPath"),
        classOf[TextInputFormat],
        classOf[LongWritable],
        classOf[Text], hadoopConf).map(r => (r._1.get(), r._2.toString()));
    });
    
    // Works fine till this line
    println("File line count = " + file.count);
    
    val rulesFilePath = "hdfs://slave:54310/user/hduser/" + 		// Without this spark is trying to read from local file system
    					hadoopConf.get("rulesFilePath") + "/" + 
    					hadoopConf.get("rulesFile");
    
    var processed = file.map(line => {
      // Doesn't work throws 'task not serializable' exception for UserGroupInformation
      val ugi = UserGroupInformation.createRemoteUser(hadoopConf.get("remoteUser"));
      // Doesn't work throws 'task not serializable' exception for FileSystem
      val fs = ugi.doAs(new PrivilegedExceptionAction[FileSystem] {
        def run(): FileSystem = {
          FileSystem.get(hadoopConf);
        }
      });
      // RulesLoader and TransFilEngine are classes from external libraries
      rulesLoader = new RulesLoader(fs.open(new Path(rulesFilePath)));
      recordTransformer = TransFilEngine.getInstance(rulesLoader, Level.FINER);
      // Doesn't work this way too - Throws 'task not serializable' exception
//      util.runAsSparkUser(() => {
//        val fs = FileSystem.get(hadoopConf);
//        rulesLoader = new RulesLoader(fs.open(new Path(rulesFilePath)));
//        recordTransformer = TransFilEngine.getInstance(rulesLoader, Level.FINER);
//      });
      
      process(line);
    });
    
    println("Processed count = " + processed.count);
  }

  def process(line: (Long, String)): String = {
    // More processing to do using other third party libraries
    line._2;
  }
  
}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to