Hi Fanchao,

This is because it is unable to find the anonymous classes generated.

Adding the below code worked for me. I found the details here :
https://github.com/cloudera/livy/blob/master/repl/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala

// Spark 1.6 does not have "classServerUri"; instead, the local
directory where class files
// are stored needs to be registered in SparkConf. See comment in
// SparkILoop::createSparkContext().

Try(sparkIMain.getClass().getMethod("classServerUri")) match {
  case Success(method) =>
    method.setAccessible(true)
    conf.set("spark.repl.class.uri",
method.invoke(sparkIMain).asInstanceOf[String])

  case Failure(_) =>
    val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
    outputDir.setAccessible(true)
    conf.set("spark.repl.class.outputDir",
      outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
}


Thanks,

Jayant



On Thu, Jun 30, 2016 at 12:34 AM, Fanchao Meng <fanchao.m...@hotmail.com>
wrote:

> Hi Spark Community,
>
> I am trying to dynamically interpret code given as a String in Spark, just
> like calling the eval in Perl language. However, I got problem when running
> the program. Really appreciate for your help.
>
> **Requirement:**
>
> The requirement is to make the spark processing chain configurable. For
> example, customer could set the processing steps in configuration file as
> below. Steps:
>      1) textFile("files///<file_full_path>")
>      2) flatMap(line => line.split(" "))
>      3) map(word => (word, 1))
>      4) reduceByKey(_ + _)
>      5) foreach(println)
>
> All above steps are defined in a configuration file.
> Then, the spark driver will load the configuration file and make the
> processing steps as a string, such as:
>
>      val processFlow =
>      """
>      sc.textFile("file:///input.txt").flatMap(line => line.split("
> ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
>      """
>
> Then, Spark will execute the piece of code defined in above variable
> processFlow.
>
> **Here is my Spark source code:**
>
> It is from word count sample, I just make the RDD methods invoked by
> interpreter as a string.
>
>      import org.apache.spark.SparkConf
>      import org.apache.spark.SparkContext
>      import scala.collection.mutable.{Map, ArraySeq}
>      import scala.tools.nsc.GenericRunnerSettings
>      import scala.tools.nsc.interpreter.IMain
>      class TestMain {
>        def exec(): Unit = {
>          val out = System.out
>          val flusher = new java.io.PrintWriter(out)
>          val interpreter = {
>            val settings = new GenericRunnerSettings( println _ )
>            settings.usejavacp.value = true
>            new IMain(settings, flusher)
>          }
>          val conf = new SparkConf().setAppName("TestMain")
>          val sc = new SparkContext(conf)
>          val methodChain =
>            """
>            val textFile = sc.textFile("file:///input.txt")
>            textFile.flatMap(line => line.split(" ")).map(word => (word,
> 1)).reduceByKey(_ + _).foreach(println)
>            """
>          interpreter.bind("sc", sc);
>          val resultFlag = interpreter.interpret(methodChain)
>        }
>      }
>      object TestMain {
>        def main(args: Array[String]) {
>          val testMain = new TestMain()
>          testMain.exec()
>          System.exit(0)
>        }
>      }
>
> **Problem:**
>
> However, I got an error when running above Spark code (master=local), logs
> as below.
>
>      sc: org.apache.spark.SparkContext =
> org.apache.spark.SparkContext@7d87addd
>      org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>              at java.security.AccessController.doPrivileged(Native Method)
>              at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>              at java.lang.Class.forName0(Native Method)
>              at java.lang.Class.forName(Class.java:270)
>              at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>              at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>              at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>              at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>              at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
>              at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>              at org.apache.spark.scheduler.Task.run(Task.scala:89)
>              at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>              at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>              at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>              at java.lang.Thread.run(Thread.java:745)
>
>      Driver stacktrace:
>              at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>              at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>              at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>              at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>              at scala.Option.foreach(Option.scala:236)
>              at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>              at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>              at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>              at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>              at
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>              at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
>              at
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>              at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>              at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>              at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>              at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>              at .<init>(<console>:12)
>              at .<clinit>(<console>)
>              at .<init>(<console>:7)
>              at .<clinit>(<console>)
>              at $print(<console>)
>              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>              at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>              at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>              at java.lang.reflect.Method.invoke(Method.java:606)
>              at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>              at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>              at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>              at
> scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>              at
> scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>              at com.tr.ecp.test.TestMain.exec(TestMain.scala:44)
>              at com.tr.ecp.test.TestMain$.main(TestMain.scala:57)
>              at com.tr.ecp.test.TestMain.main(TestMain.scala)
>              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>              at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>              at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>              at java.lang.reflect.Method.invoke(Method.java:606)
>              at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>              at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>              at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>              at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>              at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>      Caused by: java.lang.ClassNotFoundException: $anonfun$1
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>              at java.security.AccessController.doPrivileged(Native Method)
>              at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>              at java.lang.Class.forName0(Native Method)
>              at java.lang.Class.forName(Class.java:270)
>              at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>              at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>              at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>              at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>              at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
>              at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>              at org.apache.spark.scheduler.Task.run(Task.scala:89)
>              at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>              at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>              at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>              at java.lang.Thread.run(Thread.java:745)
>
>
>
> Thanks,
> Fanchao
>

Reply via email to