Hi Spark Community,

I am trying to dynamically interpret code 
given as a String in Spark, just like calling the eval in Perl language.
 However, I got problem when running the program. Really appreciate for 
your help.

**Requirement:**

The requirement is to make the
 spark processing chain configurable. For example, customer could set 
the processing steps in configuration file as below. Steps:
     1) textFile("files///<file_full_path>") 
     2) flatMap(line => line.split(" ")) 
     3) map(word => (word, 1)) 
     4) reduceByKey(_ + _) 
     5) foreach(println)

All above steps are defined in a configuration file.
Then, the spark driver will load the configuration file and make the processing 
steps as a string, such as:

     val processFlow = 
     """
    
 sc.textFile("file:///input.txt").flatMap(line => line.split(" 
")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
     """

Then, Spark will execute the piece of code defined in above variable 
processFlow.

**Here is my Spark source code:**

It is from word count sample, I just make the RDD methods invoked by 
interpreter as a string.

     import org.apache.spark.SparkConf
     import org.apache.spark.SparkContext
     import scala.collection.mutable.{Map, ArraySeq}
     import scala.tools.nsc.GenericRunnerSettings
     import scala.tools.nsc.interpreter.IMain
     class TestMain {
       def exec(): Unit = {
         val out = System.out
         val flusher = new java.io.PrintWriter(out)
         val interpreter = {
           val settings = new GenericRunnerSettings( println _ )
           settings.usejavacp.value = true
           new IMain(settings, flusher)
         }
         val conf = new SparkConf().setAppName("TestMain")
         val sc = new SparkContext(conf)
         val methodChain =
           """
           val textFile = sc.textFile("file:///input.txt")
           textFile.flatMap(line => line.split(" ")).map(word => (word, 
1)).reduceByKey(_ + _).foreach(println)
           """
         interpreter.bind("sc", sc);
         val resultFlag = interpreter.interpret(methodChain)
       }
     }
     object TestMain {
       def main(args: Array[String]) {
         val testMain = new TestMain()
         testMain.exec()
         System.exit(0)
       }
     }

**Problem:**

However, I got an error when running above Spark code (master=local), logs as 
below. 

     sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7d87addd
    
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in 
stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: 
$anonfun$1
             at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
             at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
             at java.security.AccessController.doPrivileged(Native Method)
             at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
             at java.lang.Class.forName0(Native Method)
             at java.lang.Class.forName(Class.java:270)
             at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
             at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
             at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
             at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
             at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
             at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
             at org.apache.spark.scheduler.Task.run(Task.scala:89)
             at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
             at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
             at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
             at java.lang.Thread.run(Thread.java:745)
     
     Driver stacktrace:
            
 at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
             at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
             at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
             at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
             at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
             at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
             at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
             at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
             at scala.Option.foreach(Option.scala:236)
             at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
             at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
             at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
             at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
             at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
             at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
             at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
             at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
             at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
             at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
             at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
             at .<init>(<console>:12)
             at .<clinit>(<console>)
             at .<init>(<console>:7)
             at .<clinit>(<console>)
             at $print(<console>)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
             at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:606)
             at 
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
             at 
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
             at 
scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
             at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
             at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
             at com.tr.ecp.test.TestMain.exec(TestMain.scala:44)
             at com.tr.ecp.test.TestMain$.main(TestMain.scala:57)
             at com.tr.ecp.test.TestMain.main(TestMain.scala)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
             at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:606)
             at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
             at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
             at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
             at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
             at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
     Caused by: java.lang.ClassNotFoundException: $anonfun$1
             at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
             at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
             at java.security.AccessController.doPrivileged(Native Method)
             at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
             at java.lang.Class.forName0(Native Method)
             at java.lang.Class.forName(Class.java:270)
             at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
             at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
             at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
             at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
             at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
             at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
             at org.apache.spark.scheduler.Task.run(Task.scala:89)
             at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
             at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
             at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
             at java.lang.Thread.run(Thread.java:745)
     


Thanks,
Fanchao                                           

Reply via email to