Re: How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.

2016-06-30 Thread Jayant Shekhar
Hi Fanchao,

This is because it is unable to find the anonymous classes generated.

Adding the below code worked for me. I found the details here :
https://github.com/cloudera/livy/blob/master/repl/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala

// Spark 1.6 does not have "classServerUri"; instead, the local
directory where class files
// are stored needs to be registered in SparkConf. See comment in
// SparkILoop::createSparkContext().

Try(sparkIMain.getClass().getMethod("classServerUri")) match {
  case Success(method) =>
method.setAccessible(true)
conf.set("spark.repl.class.uri",
method.invoke(sparkIMain).asInstanceOf[String])

  case Failure(_) =>
val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
outputDir.setAccessible(true)
conf.set("spark.repl.class.outputDir",
  outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
}


Thanks,

Jayant



On Thu, Jun 30, 2016 at 12:34 AM, Fanchao Meng 
wrote:

> Hi Spark Community,
>
> I am trying to dynamically interpret code given as a String in Spark, just
> like calling the eval in Perl language. However, I got problem when running
> the program. Really appreciate for your help.
>
> **Requirement:**
>
> The requirement is to make the spark processing chain configurable. For
> example, customer could set the processing steps in configuration file as
> below. Steps:
>  1) textFile("files///")
>  2) flatMap(line => line.split(" "))
>  3) map(word => (word, 1))
>  4) reduceByKey(_ + _)
>  5) foreach(println)
>
> All above steps are defined in a configuration file.
> Then, the spark driver will load the configuration file and make the
> processing steps as a string, such as:
>
>  val processFlow =
>  """
>  sc.textFile("file:///input.txt").flatMap(line => line.split("
> ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
>  """
>
> Then, Spark will execute the piece of code defined in above variable
> processFlow.
>
> **Here is my Spark source code:**
>
> It is from word count sample, I just make the RDD methods invoked by
> interpreter as a string.
>
>  import org.apache.spark.SparkConf
>  import org.apache.spark.SparkContext
>  import scala.collection.mutable.{Map, ArraySeq}
>  import scala.tools.nsc.GenericRunnerSettings
>  import scala.tools.nsc.interpreter.IMain
>  class TestMain {
>def exec(): Unit = {
>  val out = System.out
>  val flusher = new java.io.PrintWriter(out)
>  val interpreter = {
>val settings = new GenericRunnerSettings( println _ )
>settings.usejavacp.value = true
>new IMain(settings, flusher)
>  }
>  val conf = new SparkConf().setAppName("TestMain")
>  val sc = new SparkContext(conf)
>  val methodChain =
>"""
>val textFile = sc.textFile("file:///input.txt")
>textFile.flatMap(line => line.split(" ")).map(word => (word,
> 1)).reduceByKey(_ + _).foreach(println)
>"""
>  interpreter.bind("sc", sc);
>  val resultFlag = interpreter.interpret(methodChain)
>}
>  }
>  object TestMain {
>def main(args: Array[String]) {
>  val testMain = new TestMain()
>  testMain.exec()
>  System.exit(0)
>}
>  }
>
> **Problem:**
>
> However, I got an error when running above Spark code (master=local), logs
> as below.
>
>  sc: org.apache.spark.SparkContext =
> org.apache.spark.SparkContext@7d87addd
>  org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1
>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>  at java.lang.Class.forName0(Native Method)
>  at java.lang.Class.forName(Class.java:270)
>  at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>  at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>  at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>  at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>  at
> 

How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.

2016-06-30 Thread Fanchao Meng
Hi Spark Community,

I am trying to dynamically interpret code 
given as a String in Spark, just like calling the eval in Perl language.
 However, I got problem when running the program. Really appreciate for 
your help.

**Requirement:**

The requirement is to make the
 spark processing chain configurable. For example, customer could set 
the processing steps in configuration file as below. Steps:
 1) textFile("files///") 
 2) flatMap(line => line.split(" ")) 
 3) map(word => (word, 1)) 
 4) reduceByKey(_ + _) 
 5) foreach(println)

All above steps are defined in a configuration file.
Then, the spark driver will load the configuration file and make the processing 
steps as a string, such as:

 val processFlow = 
 """

 sc.textFile("file:///input.txt").flatMap(line => line.split(" 
")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
 """

Then, Spark will execute the piece of code defined in above variable 
processFlow.

**Here is my Spark source code:**

It is from word count sample, I just make the RDD methods invoked by 
interpreter as a string.

 import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import scala.collection.mutable.{Map, ArraySeq}
 import scala.tools.nsc.GenericRunnerSettings
 import scala.tools.nsc.interpreter.IMain
 class TestMain {
   def exec(): Unit = {
 val out = System.out
 val flusher = new java.io.PrintWriter(out)
 val interpreter = {
   val settings = new GenericRunnerSettings( println _ )
   settings.usejavacp.value = true
   new IMain(settings, flusher)
 }
 val conf = new SparkConf().setAppName("TestMain")
 val sc = new SparkContext(conf)
 val methodChain =
   """
   val textFile = sc.textFile("file:///input.txt")
   textFile.flatMap(line => line.split(" ")).map(word => (word, 
1)).reduceByKey(_ + _).foreach(println)
   """
 interpreter.bind("sc", sc);
 val resultFlag = interpreter.interpret(methodChain)
   }
 }
 object TestMain {
   def main(args: Array[String]) {
 val testMain = new TestMain()
 testMain.exec()
 System.exit(0)
   }
 }

**Problem:**

However, I got an error when running above Spark code (master=local), logs as 
below. 

 sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7d87addd

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in 
stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: 
$anonfun$1
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
 at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
 at