Repository: zeppelin
Updated Branches:
  refs/heads/master 96d78ee57 -> 1ff8c1475


[ZEPPELIN-2675] Distributing Jars when using an external flink cluster

### What is this PR for?

This PR intends to make Flink interpreter able to distribute external 
dependencies on cluster when they are loaded by Web UI.

The code simply collects jar paths downloaded by DependecyResolver and add them 
to FlinkILoop constructor.

Loading external dependencies in Flink interpreter only work with MiniCluster 
and Flink version lower than 1.3.0. Only Spark is able to distribute jars at 
the moment.

### What type of PR is it?

Improvement

### What is the Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-2675

### How should this be tested?

1. [Download Flink](https://flink.apache.org/downloads.html) and run local 
cluster with following command:

```./bin/start-cluster.sh```

2. go to the interpreter page and in dependencies section add artifact:

```joda-time:joda-time:jar:2.9.9```

3.  change `local` with `localhost` in interpreter page

4. run below code in paragraph

```
%flink
import org.joda.time.{DateTime, DateTimeZone}

val text = benv.fromElements("To be or not to be")
text
  .flatMap { _.toLowerCase.split(" ") }
  .map(word => (word, new DateTime(System.currentTimeMillis(), 
DateTimeZone.UTC) ))
  .print()
```

## Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: marcotagliabue <marco.taglia...@icloud.com>

Closes #2429 from marcotagliabue/feature/distribute-jars-flink and squashes the 
following commits:

49bd1f59 [marcotagliabue] Enable distribution of jars in Flink cluster


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1ff8c147
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1ff8c147
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1ff8c147

Branch: refs/heads/master
Commit: 1ff8c1475eebf67fbc2a6a9b0a09c3ac273bc4e5
Parents: 96d78ee
Author: marcotagliabue <marco.taglia...@icloud.com>
Authored: Thu Jun 22 14:21:28 2017 +0200
Committer: 1ambda <1am...@gmail.com>
Committed: Mon Jul 10 10:18:03 2017 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/flink/FlinkInterpreter.java | 26 +++++++++++++++++---
 1 file changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ff8c147/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 91ffb9c..710eace 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -91,11 +91,29 @@ public class FlinkInterpreter extends Interpreter {
       startFlinkMiniCluster();
     }
 
+    String[] externalJars = new String[0];
+    String localRepo = getProperty("zeppelin.interpreter.localRepo");
+    if (localRepo != null) {
+      File localRepoDir = new File(localRepo);
+      if (localRepoDir.exists()) {
+        File[] files = localRepoDir.listFiles();
+        if (files != null) {
+          externalJars = new String[files.length];
+          for (int i = 0; i < files.length; i++) {
+            if (externalJars.length > 0) {
+              externalJars[i] = files[i].getAbsolutePath();
+            }
+          }
+        }
+      }
+    }
+
     flinkIloop = new FlinkILoop(getHost(),
-                                getPort(),
-                                flinkConf,
-                                (BufferedReader) null,
-                                new PrintWriter(out));
+        getPort(),
+        flinkConf,
+        new Some<>(externalJars),
+        (BufferedReader) null,
+        new PrintWriter(out));
 
     flinkIloop.settings_$eq(createSettings());
     flinkIloop.createInterpreter();

Reply via email to