[ 
https://issues.apache.org/jira/browse/SPARK-15582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308036#comment-15308036
 ] 

Catalin Alexandru Zamfir edited comment on SPARK-15582 at 6/1/16 5:41 AM:
--------------------------------------------------------------------------

So here's the work-around, wished there was a way not to do it this way. In 
short, we are:
- taking the script by text and using CompilationUnit (from Groovy) to compile 
the script to bytecode. This must be done on the driver node before evaluating 
the script;
- for each class from the above CU, we append it to the JAR on a distributed 
FS. We give each JAR an unique name, later passed on to addJar. This is just 
for isolation purposes so that each Script gets its own JAR;
- the resulting JAR is added to the Spark context using addJar;
- all closures must by dehydrated () which comes with its limitations and also 
nested closures don't seem to work;
- however, if you design your code around the problem of Groovy closures, using 
interfaces/abstract classes and you extend or implement those classes in Groovy 
code it seems there is no limitation on what you can do (eg. an AbstractFilter 
class/interface);

{noformat}
        // Compile it
        Long dateOfNow = DateTime.now ().getMillis ();
        String nameOfScript = String.format ("ScriptOf%d", dateOfNow);
        String pathOfJar = String.format 
("distributed/fs/path/to/your/groovy-scripts/%s.jar", String.format ("JarOf%d", 
dateOfNow));
        File archiveFile = new File (pathOfJar);
        Files.createParentDirs (archiveFile);
        
        // With resources
        List<?> compilationList = compileGroovyScript (nameOfScript, 
sourceCode);
        
        try (JarArchiveOutputStream oneJar = new JarArchiveOutputStream (new 
FileOutputStream (new File (pathOfJar)))) {
            // For
            for (Object compileClass : compilationList) {
                // Append
                GroovyClass groovyClass = (GroovyClass) compileClass;
                JarArchiveEntry oneJarEntry = new JarArchiveEntry 
(String.format ("%s.class", groovyClass.getName ()));
                oneJarEntry.setSize (groovyClass.getBytes ().length);
                byte[] bytecodeOfClass = groovyClass.getBytes ();
                oneJar.putArchiveEntry (oneJarEntry);
                oneJar.write (bytecodeOfClass);
                oneJar.closeArchiveEntry ();
            }
            
            // End it up
            oneJar.finish ();
            oneJar.close ();
        } catch (Exception e) {
            // Do something
        }
        
        // Append the JAR to the execution environment
        sparkService.getSparkContext ().addJar (pathOfJar);

// GroovyShell.evaluate (scriptText, nameOfScript) below;
{noformat}

Any idea on how this can be improved? (eg. not using the addJar method and the 
requirement to not dehydrate the Groovy closures)

For now this works for us, after SPARK-13599 was fixed. However the extra code 
required to make it work could be a part of Spark instead.


was (Author: antauri):
So here's the work-around, wished there was a way not to do it this way. In 
short, we are:
- taking the script by text and using CompilationUnit (from Groovy) to compile 
the script to bytecode. This must be done on the driver node before evaluating 
the script;
- for each class from the above CU, we append it to the JAR on a distributed 
FS. We give each JAR an unique name, later passed on to addJar. This is just 
for isolation purposes so that each Script gets its own JAR;
- the resulting JAR is added to the Spark context using addJar;
- all closures must by dehydrated () which comes with its limitations and also 
nested closures don't seem to work;
- however, if you design your code around the problem of Groovy closures, using 
interfaces/abstract classes and you extend or implement those classes in Groovy 
code it seems there is no limitation on what you can do (eg. an AbstractFilter 
class/interface);

{noformat}
        // Compile it
        Long dateOfNow = DateTime.now ().getMillis ();
        String nameOfScript = String.format ("ScriptOf%d", dateOfNow);
        String pathOfJar = String.format 
("distributed/fs/path/to/your/groovy-scripts/%s.jar", String.format ("JarOf%d", 
dateOfNow));
        File archiveFile = new File (pathOfJar);
        Files.createParentDirs (archiveFile);
        
        // With resources
        List<?> compilationList = compileGroovyScript (nameOfScript, 
sourceCode);
        
        try (JarArchiveOutputStream oneJar = new JarArchiveOutputStream (new 
FileOutputStream (new File (pathOfJar)))) {
            // For
            for (Object compileClass : compilationList) {
                // Append
                GroovyClass groovyClass = (GroovyClass) compileClass;
                JarArchiveEntry oneJarEntry = new JarArchiveEntry 
(String.format ("%s.class", groovyClass.getName ()));
                oneJarEntry.setSize (groovyClass.getBytes ().length);
                byte[] bytecodeOfClass = groovyClass.getBytes ();
                oneJar.putArchiveEntry (oneJarEntry);
                oneJar.write (bytecodeOfClass);
                oneJar.closeArchiveEntry ();
            }
            
            // End it up
            oneJar.finish ();
            oneJar.close ();
        } catch (Exception e) {
            // Do something
        }
        
        // Append the JAR to the execution environment
        sparkService.getSparkContext ().addJar (pathOfJar);

// GroovyShell.evaluate (scriptText, nameOfScript) below;
{noformat}

Any idea on how this can be improved? (eg. not using the addJar method and the 
requirement to not dehydrate the Groovy closures)

> Support for Groovy closures
> ---------------------------
>
>                 Key: SPARK-15582
>                 URL: https://issues.apache.org/jira/browse/SPARK-15582
>             Project: Spark
>          Issue Type: Improvement
>          Components: Input/Output, Java API
>    Affects Versions: 1.6.1, 1.6.2, 2.0.0
>         Environment: 6 node Debian 8 based Spark cluster
>            Reporter: Catalin Alexandru Zamfir
>
> After fixing SPARK-13599 and running one of our jobs against this fix for 
> Groovy dependencies (which indeed it fixed), we see the Spark executors stuck 
> at a ClassNotFound exception when running as a Script (via 
> GroovyShell.evalute (scriptText)). It seems Spark cannot de-serialize the 
> closure, or the closure is not received by the executor.
> {noformat}
> sparkContext.binaryFiles (ourPath).flatMap ({ onePathEntry -> code-block } as 
> FlatMapFunction).count ();
> { onePathEntry -> code-block } denotes a Groovy closure.
> {noformat}
> There is a groovy-spark example @ 
> https://github.com/bunions1/groovy-spark-example ... However the above uses a 
> modified Groovy. If my understanding is correct, Groovy compiles to native 
> byte-code, which should be easy for Spark to pick-up and use closures.
> The above example code fails with this stack-trace:
> {noformat}
> Caused by: java.lang.ClassNotFoundException: Script1$_run_closure1
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214
> {noformat}
> Any ideas on how to tackle this, welcomed. I've tried Googling around for 
> similar issues, but nobody has found a solution.
> At least, point me on where to "hack" to make Spark support closures and I'd 
> share some of my time to make it work. There is SPARK-2171 arguing that 
> support for this is out of the box, but for projects of a relative complex 
> size where the driver application is contained/part-of a bigger application 
> and running on a cluster, things do not seem to work. I don't know if 
> SPARK-2171 has tried to run outside of a local[] cluster set-up where such 
> issues can arise.
> I saw a couple of people trying to make it to work, but again, they look to 
> work-arounds (eg. distribution of byte-code manually before needed, adding a 
> JAR with addJar and other work-arounds).
> Can this be done? Where can we look?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to