Repository: zeppelin Updated Branches: refs/heads/master 96d78ee57 -> 1ff8c1475
[ZEPPELIN-2675] Distributing Jars when using an external flink cluster ### What is this PR for? This PR intends to make Flink interpreter able to distribute external dependencies on cluster when they are loaded by Web UI. The code simply collects jar paths downloaded by DependecyResolver and add them to FlinkILoop constructor. Loading external dependencies in Flink interpreter only work with MiniCluster and Flink version lower than 1.3.0. Only Spark is able to distribute jars at the moment. ### What type of PR is it? Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-2675 ### How should this be tested? 1. [Download Flink](https://flink.apache.org/downloads.html) and run local cluster with following command: ```./bin/start-cluster.sh``` 2. go to the interpreter page and in dependencies section add artifact: ```joda-time:joda-time:jar:2.9.9``` 3. change `local` with `localhost` in interpreter page 4. run below code in paragraph ``` %flink import org.joda.time.{DateTime, DateTimeZone} val text = benv.fromElements("To be or not to be") text .flatMap { _.toLowerCase.split(" ") } .map(word => (word, new DateTime(System.currentTimeMillis(), DateTimeZone.UTC) )) .print() ``` ## Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: marcotagliabue <marco.taglia...@icloud.com> Closes #2429 from marcotagliabue/feature/distribute-jars-flink and squashes the following commits: 49bd1f59 [marcotagliabue] Enable distribution of jars in Flink cluster Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1ff8c147 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1ff8c147 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1ff8c147 Branch: refs/heads/master Commit: 1ff8c1475eebf67fbc2a6a9b0a09c3ac273bc4e5 Parents: 96d78ee Author: marcotagliabue <marco.taglia...@icloud.com> Authored: Thu Jun 22 14:21:28 2017 +0200 Committer: 1ambda <1am...@gmail.com> Committed: Mon Jul 10 10:18:03 2017 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/flink/FlinkInterpreter.java | 26 +++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ff8c147/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 91ffb9c..710eace 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -91,11 +91,29 @@ public class FlinkInterpreter extends Interpreter { startFlinkMiniCluster(); } + String[] externalJars = new String[0]; + String localRepo = getProperty("zeppelin.interpreter.localRepo"); + if (localRepo != null) { + File localRepoDir = new File(localRepo); + if (localRepoDir.exists()) { + File[] files = localRepoDir.listFiles(); + if (files != null) { + externalJars = new String[files.length]; + for (int i = 0; i < files.length; i++) { + if (externalJars.length > 0) { + externalJars[i] = files[i].getAbsolutePath(); + } + } + } + } + } + flinkIloop = new FlinkILoop(getHost(), - getPort(), - flinkConf, - (BufferedReader) null, - new PrintWriter(out)); + getPort(), + flinkConf, + new Some<>(externalJars), + (BufferedReader) null, + new PrintWriter(out)); flinkIloop.settings_$eq(createSettings()); flinkIloop.createInterpreter();