Re: How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.
Hi Fanchao, This is because it is unable to find the anonymous classes generated. Adding the below code worked for me. I found the details here : https://github.com/cloudera/livy/blob/master/repl/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala // Spark 1.6 does not have "classServerUri"; instead, the local directory where class files // are stored needs to be registered in SparkConf. See comment in // SparkILoop::createSparkContext(). Try(sparkIMain.getClass().getMethod("classServerUri")) match { case Success(method) => method.setAccessible(true) conf.set("spark.repl.class.uri", method.invoke(sparkIMain).asInstanceOf[String]) case Failure(_) => val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory") outputDir.setAccessible(true) conf.set("spark.repl.class.outputDir", outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath()) } Thanks, Jayant On Thu, Jun 30, 2016 at 12:34 AM, Fanchao Mengwrote: > Hi Spark Community, > > I am trying to dynamically interpret code given as a String in Spark, just > like calling the eval in Perl language. However, I got problem when running > the program. Really appreciate for your help. > > **Requirement:** > > The requirement is to make the spark processing chain configurable. For > example, customer could set the processing steps in configuration file as > below. Steps: > 1) textFile("files///") > 2) flatMap(line => line.split(" ")) > 3) map(word => (word, 1)) > 4) reduceByKey(_ + _) > 5) foreach(println) > > All above steps are defined in a configuration file. > Then, the spark driver will load the configuration file and make the > processing steps as a string, such as: > > val processFlow = > """ > sc.textFile("file:///input.txt").flatMap(line => line.split(" > ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println) > """ > > Then, Spark will execute the piece of code defined in above variable > processFlow. > > **Here is my Spark source code:** > > It is from word count sample, I just make the RDD methods invoked by > interpreter as a string. > > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import scala.collection.mutable.{Map, ArraySeq} > import scala.tools.nsc.GenericRunnerSettings > import scala.tools.nsc.interpreter.IMain > class TestMain { >def exec(): Unit = { > val out = System.out > val flusher = new java.io.PrintWriter(out) > val interpreter = { >val settings = new GenericRunnerSettings( println _ ) >settings.usejavacp.value = true >new IMain(settings, flusher) > } > val conf = new SparkConf().setAppName("TestMain") > val sc = new SparkContext(conf) > val methodChain = >""" >val textFile = sc.textFile("file:///input.txt") >textFile.flatMap(line => line.split(" ")).map(word => (word, > 1)).reduceByKey(_ + _).foreach(println) >""" > interpreter.bind("sc", sc); > val resultFlag = interpreter.interpret(methodChain) >} > } > object TestMain { >def main(args: Array[String]) { > val testMain = new TestMain() > testMain.exec() > System.exit(0) >} > } > > **Problem:** > > However, I got an error when running above Spark code (master=local), logs > as below. > > sc: org.apache.spark.SparkContext = > org.apache.spark.SparkContext@7d87addd > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1 > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at >
How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.
Hi Spark Community, I am trying to dynamically interpret code given as a String in Spark, just like calling the eval in Perl language. However, I got problem when running the program. Really appreciate for your help. **Requirement:** The requirement is to make the spark processing chain configurable. For example, customer could set the processing steps in configuration file as below. Steps: 1) textFile("files///") 2) flatMap(line => line.split(" ")) 3) map(word => (word, 1)) 4) reduceByKey(_ + _) 5) foreach(println) All above steps are defined in a configuration file. Then, the spark driver will load the configuration file and make the processing steps as a string, such as: val processFlow = """ sc.textFile("file:///input.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println) """ Then, Spark will execute the piece of code defined in above variable processFlow. **Here is my Spark source code:** It is from word count sample, I just make the RDD methods invoked by interpreter as a string. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.mutable.{Map, ArraySeq} import scala.tools.nsc.GenericRunnerSettings import scala.tools.nsc.interpreter.IMain class TestMain { def exec(): Unit = { val out = System.out val flusher = new java.io.PrintWriter(out) val interpreter = { val settings = new GenericRunnerSettings( println _ ) settings.usejavacp.value = true new IMain(settings, flusher) } val conf = new SparkConf().setAppName("TestMain") val sc = new SparkContext(conf) val methodChain = """ val textFile = sc.textFile("file:///input.txt") textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println) """ interpreter.bind("sc", sc); val resultFlag = interpreter.interpret(methodChain) } } object TestMain { def main(args: Array[String]) { val testMain = new TestMain() testMain.exec() System.exit(0) } } **Problem:** However, I got an error when running above Spark code (master=local), logs as below. sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7d87addd org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at