Hello users,

In one of my usecases, I need to launch a spark job from spark-shell. My
input file is in HDFS and I am using NewHadoopRDD to construct RDD out of
this input file as it uses custom input format.

  val hConf = sc.hadoopConfiguration



    var job = new Job(hConf)



    FileInputFormat.setInputPaths(job,new Path(path));


 var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat],

        classOf[IntWritable],

        classOf[BytesWritable],

        job.getConfiguration()

        )


However, when I run this commands, epsecially the 2nd line, in spark-shell,
I get the below exception

java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING


So, I put these lines inside a method in another class. In my driver code,
I am calling the method.


 // obtain RDD for the input file in HDFS

      var hRDD = cas.getRDD("/user/bala/RDF_STORE/MERGED_DAR.DAT_256")



The above line construct Hadoop RDD and return the handle to the driver
code. This works fine now. This means that I could work around
illegalstateException issue. However, when I run the below command


      // runs the map and collect the results from distributed machines

      val result = hRDD.mapPartitionsWithInputSplit{ (split, iter) =>
cas.extractCAS(split, iter)}.collect()




I get an error "Caused by: java.io.NotSerializableException:
org.apache.spark.SparkContext"


Can someone help how can I work around this problem?? This code works
perfectly when I use spark-submit though but I cannot use this. My ultimate
idea is to run the driver code from zeppelin and hence i am testing this
code in spark-shell



with regards

Bala

Reply via email to