ZEPPELIN-3569. Improvement of FlinkInterpreter ### What is this PR for? This PR just refactor the flink interpreter and also introduce several main features.
Here's the main changes; 1. Upgrade flink to 1.5.2 2. Support ZeppelinContext 3. Support %flink.sql 4. Support yarn mode ### What type of PR is it? [Improvement | Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3569 ### How should this be tested? * Unit test is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3054 from zjffdu/ZEPPELIN-3569 and squashes the following commits: a256a95fd [Jeff Zhang] ZEPPELIN-3569. Improvement of FlinkInterpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1c5b38a9 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1c5b38a9 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1c5b38a9 Branch: refs/heads/master Commit: 1c5b38a9afb211b4681a9fe99f9740add76d0f37 Parents: 8e5013c Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Jun 5 14:28:21 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Thu Aug 9 11:12:00 2018 +0800 ---------------------------------------------------------------------- .travis.yml | 2 +- bin/interpreter.sh | 15 + flink/pom.xml | 45 +- .../apache/zeppelin/flink/FlinkInterpreter.java | 420 +++---------------- .../zeppelin/flink/FlinkSQLInterpreter.java | 72 ++++ .../src/main/resources/interpreter-setting.json | 19 + .../flink/FlinkSQLScalaInterpreter.scala | 42 ++ .../zeppelin/flink/FlinkScalaInterpreter.scala | 230 ++++++++++ .../zeppelin/flink/FlinkZeppelinContext.scala | 177 ++++++++ .../zeppelin/flink/FlinkInterpreterTest.java | 263 +++++++++--- .../zeppelin/flink/FlinkSQLInterpreterTest.java | 110 +++++ flink/src/test/resources/flink-conf.yaml | 247 +++++++++++ flink/src/test/resources/log4j.properties | 24 ++ .../launcher/StandardInterpreterLauncher.java | 6 + zeppelin-server/pom.xml | 8 - .../interpreter/SparkDownloadUtils.java | 136 ++++++ .../interpreter/remote/RemoteInterpreter.java | 4 +- .../interpreter/FlinkIntegrationTest.java | 116 +++++ .../interpreter/SparkDownloadUtils.java | 110 ----- .../src/test/resources/flink-conf.yaml | 0 20 files changed, 1486 insertions(+), 560 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 5249ef2..5ddbaa5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,7 +41,7 @@ addons: env: global: # Interpreters does not required by zeppelin-server integration tests - - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!r,!java' + - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!r,!java' matrix: include: http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 48cf0f6..c895018 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -201,6 +201,21 @@ elif [[ "${INTERPRETER_ID}" == "pig" ]]; then else echo "TEZ_CONF_DIR is not set, configuration might not be loaded" fi +elif [[ "${INTERPRETER_ID}" == "flink" ]]; then + if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}" + export HADOOP_CONF_DIR=${HADOOP_CONF_DIR} + else + # autodetect HADOOP_CONF_HOME by heuristic + if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then + if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then + export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" + elif [[ -d "/etc/hadoop/conf" ]]; then + export HADOOP_CONF_DIR="/etc/hadoop/conf" + fi + fi + fi + fi addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}" http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index 70c076d..217813b 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -27,7 +27,7 @@ </parent> <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-flink_2.10</artifactId> + <artifactId>zeppelin-flink</artifactId> <packaging>jar</packaging> <version>0.9.0-SNAPSHOT</version> <name>Zeppelin: Flink</name> @@ -36,15 +36,18 @@ <properties> <!--library versions--> <interpreter.name>flink</interpreter.name> - <flink.version>1.1.3</flink.version> + <flink.version>1.5.2</flink.version> <flink.akka.version>2.3.7</flink.akka.version> <scala.macros.version>2.0.1</scala.macros.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> <!--plugin versions--> <plugin.scalamaven.version>3.2.2</plugin.scalamaven.version> <plugin.eclipse.version>2.8</plugin.eclipse.version> <plugin.buildhelper.version>1.7</plugin.buildhelper.version> <plugin.scalastyle.version>0.5.0</plugin.scalastyle.version> + </properties> <dependencies> @@ -90,38 +93,32 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_${scala.binary.version}</artifactId> + <artifactId>flink-yarn_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala-shell_${scala.binary.version}</artifactId> + <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-actor_${scala.binary.version}</artifactId> - <version>${flink.akka.version}</version> - </dependency> - - <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-remote_${scala.binary.version}</artifactId> - <version>${flink.akka.version}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> </dependency> <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-slf4j_${scala.binary.version}</artifactId> - <version>${flink.akka.version}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala-shell_${scala.binary.version}</artifactId> + <version>${flink.version}</version> </dependency> <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-testkit_${scala.binary.version}</artifactId> - <version>${flink.akka.version}</version> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>20.0</version> </dependency> <dependency> @@ -282,6 +279,16 @@ </plugin> <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkMode>always</forkMode> + <environmentVariables> + <FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR> + </environmentVariables> + </configuration> + </plugin> + + <plugin> <artifactId>maven-enforcer-plugin</artifactId> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 9d66437..c14407d 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,403 +14,90 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.zeppelin.flink; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.scala.FlinkILoop; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import scala.Console; -import scala.Some; -import scala.collection.JavaConversions; -import scala.concurrent.duration.FiniteDuration; -import scala.runtime.AbstractFunction0; -import scala.tools.nsc.Settings; -import scala.tools.nsc.interpreter.IMain; -import scala.tools.nsc.interpreter.Results; -import scala.tools.nsc.settings.MutableSettings; -import scala.tools.nsc.settings.MutableSettings.BooleanSetting; -import scala.tools.nsc.settings.MutableSettings.PathSetting; +package org.apache.zeppelin.flink; +import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.InterpreterUtils; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -/** - * Interpreter for Apache Flink (http://flink.apache.org). - */ -public class FlinkInterpreter extends Interpreter { - Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class); - private ByteArrayOutputStream out; - private Configuration flinkConf; - private LocalFlinkMiniCluster localFlinkCluster; - private FlinkILoop flinkIloop; - private Map<String, Object> binder; - private IMain imain; - - public FlinkInterpreter(Properties property) { - super(property); - } - - @Override - public void open() { - out = new ByteArrayOutputStream(); - flinkConf = new org.apache.flink.configuration.Configuration(); - Properties intpProperty = getProperties(); - for (Object k : intpProperty.keySet()) { - String key = (String) k; - String val = toString(intpProperty.get(key)); - flinkConf.setString(key, val); - } - - if (localMode()) { - startFlinkMiniCluster(); - } - - String[] externalJars = new String[0]; - String localRepo = getProperty("zeppelin.interpreter.localRepo"); - if (localRepo != null) { - File localRepoDir = new File(localRepo); - if (localRepoDir.exists()) { - File[] files = localRepoDir.listFiles(); - if (files != null) { - externalJars = new String[files.length]; - for (int i = 0; i < files.length; i++) { - if (externalJars.length > 0) { - externalJars[i] = files[i].getAbsolutePath(); - } - } - } - } - } - - flinkIloop = new FlinkILoop(getHost(), - getPort(), - flinkConf, - new Some<>(externalJars), - (BufferedReader) null, - new PrintWriter(out)); - - flinkIloop.settings_$eq(createSettings()); - flinkIloop.createInterpreter(); - - imain = flinkIloop.intp(); - - org.apache.flink.api.scala.ExecutionEnvironment benv = - flinkIloop.scalaBenv(); - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv = - flinkIloop.scalaSenv(); - - senv.getConfig().disableSysoutLogging(); - benv.getConfig().disableSysoutLogging(); - - // prepare bindings - imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - Map<String, Object> binder = (Map<String, Object>) getLastObject(); - - // import libraries - imain.interpret("import scala.tools.nsc.io._"); - imain.interpret("import Properties.userHome"); - imain.interpret("import scala.compat.Platform.EOL"); - - imain.interpret("import org.apache.flink.api.scala._"); - imain.interpret("import org.apache.flink.api.common.functions._"); - - - binder.put("benv", benv); - imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf[" - + benv.getClass().getName() + "]"); - - binder.put("senv", senv); - imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf[" - + senv.getClass().getName() + "]"); - - } - - private boolean localMode() { - String host = getProperty("host"); - return host == null || host.trim().length() == 0 || host.trim().equals("local"); - } - - private String getHost() { - if (localMode()) { - return "localhost"; - } else { - return getProperty("host"); - } - } - - private int getPort() { - if (localMode()) { - return localFlinkCluster.getLeaderRPCPort(); - } else { - return Integer.parseInt(getProperty("port")); - } - } - - private Settings createSettings() { - URL[] urls = getClassloaderUrls(); - Settings settings = new Settings(); - - // set classpath - PathSetting pathSettings = settings.classpath(); - String classpath = ""; - List<File> paths = currentClassPath(); - for (File f : paths) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += f.getAbsolutePath(); - } - - if (urls != null) { - for (URL u : urls) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += u.getFile(); - } - } - - pathSettings.v_$eq(classpath); - settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); - settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread() - .getContextClassLoader())); - BooleanSetting b = (BooleanSetting) settings.usejavacp(); - b.v_$eq(true); - settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - - // To prevent 'File name too long' error on some file system. - MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName(); - numClassFileSetting.v_$eq(128); - settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq( - numClassFileSetting); - - return settings; - } - - - private List<File> currentClassPath() { - List<File> paths = classPath(Thread.currentThread().getContextClassLoader()); - String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); - if (cps != null) { - for (String cp : cps) { - paths.add(new File(cp)); - } - } - return paths; - } +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; - private List<File> classPath(ClassLoader cl) { - List<File> paths = new LinkedList<>(); - if (cl == null) { - return paths; - } +public class FlinkInterpreter extends Interpreter { - if (cl instanceof URLClassLoader) { - URLClassLoader ucl = (URLClassLoader) cl; - URL[] urls = ucl.getURLs(); - if (urls != null) { - for (URL url : urls) { - paths.add(new File(url.getFile())); - } - } - } - return paths; - } + private FlinkScalaInterpreter innerIntp; + private FlinkZeppelinContext z; - public Object getLastObject() { - Object obj = imain.lastRequest().lineRep().call( - "$result", - JavaConversions.asScalaBuffer(new LinkedList<>())); - return obj; + public FlinkInterpreter(Properties properties) { + super(properties); + this.innerIntp = new FlinkScalaInterpreter(getProperties()); } @Override - public void close() { - flinkIloop.closeInterpreter(); + public void open() throws InterpreterException { + this.innerIntp.open(); - if (localMode()) { - stopFlinkMiniCluster(); - } + // bind ZeppelinContext + int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000")); + this.z = new FlinkZeppelinContext(innerIntp.getBatchTableEnviroment(), + getInterpreterGroup().getInterpreterHookRegistry(), maxRow); + List<String> modifiers = new ArrayList<>(); + modifiers.add("@transient"); + this.innerIntp.bind("z", z.getClass().getCanonicalName(), z, modifiers); } @Override - public InterpreterResult interpret(String line, InterpreterContext context) { - if (line == null || line.trim().length() == 0) { - return new InterpreterResult(Code.SUCCESS); - } - - InterpreterResult result = interpret(line.split("\n"), context); - return result; + public void close() throws InterpreterException { + this.innerIntp.close(); } - public InterpreterResult interpret(String[] lines, InterpreterContext context) { - final IMain imain = flinkIloop.intp(); - - String[] linesToRun = new String[lines.length + 1]; - for (int i = 0; i < lines.length; i++) { - linesToRun[i] = lines[i]; - } - linesToRun[lines.length] = "print(\"\")"; - - System.setOut(new PrintStream(out)); - out.reset(); - Code r = null; - - String incomplete = ""; - boolean inComment = false; - - for (int l = 0; l < linesToRun.length; l++) { - final String s = linesToRun[l]; - // check if next line starts with "." (but not ".." or "./") it is treated as an invocation - if (l + 1 < linesToRun.length) { - String nextLine = linesToRun[l + 1].trim(); - boolean continuation = false; - if (nextLine.isEmpty() - || nextLine.startsWith("//") // skip empty line or comment - || nextLine.startsWith("}") - || nextLine.startsWith("object")) { // include "} object" for Scala companion object - continuation = true; - } else if (!inComment && nextLine.startsWith("/*")) { - inComment = true; - continuation = true; - } else if (inComment && nextLine.lastIndexOf("*/") >= 0) { - inComment = false; - continuation = true; - } else if (nextLine.length() > 1 - && nextLine.charAt(0) == '.' - && nextLine.charAt(1) != '.' // ".." - && nextLine.charAt(1) != '/') { // "./" - continuation = true; - } else if (inComment) { - continuation = true; - } - if (continuation) { - incomplete += s + "\n"; - continue; - } - } - - final String currentCommand = incomplete; - - scala.tools.nsc.interpreter.Results.Result res = null; - try { - res = Console.withOut( - System.out, - new AbstractFunction0<Results.Result>() { - @Override - public Results.Result apply() { - return imain.interpret(currentCommand + s); - } - }); - } catch (Exception e) { - logger.info("Interpreter exception", e); - return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); - } - - r = getResultCode(res); - - if (r == Code.ERROR) { - return new InterpreterResult(r, out.toString()); - } else if (r == Code.INCOMPLETE) { - incomplete += s + "\n"; - } else { - incomplete = ""; - } - } - - if (r == Code.INCOMPLETE) { - return new InterpreterResult(r, "Incomplete expression"); - } else { - return new InterpreterResult(r, out.toString()); - } - } - - private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { - if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { - return Code.SUCCESS; - } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { - return Code.INCOMPLETE; - } else { - return Code.ERROR; - } + @Override + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { + this.z.setInterpreterContext(context); + this.z.setGui(context.getGui()); + this.z.setNoteGui(context.getNoteGui()); + return innerIntp.interpret(st, context); } @Override - public void cancel(InterpreterContext context) { - if (localMode()) { - // In localMode we can cancel all running jobs, - // because the local cluster can only run one job at the time. - for (JobID job : this.localFlinkCluster.getCurrentlyRunningJobsJava()) { - logger.info("Stop job: " + job); - cancelJobLocalMode(job); - } - } - } + public void cancel(InterpreterContext context) throws InterpreterException { - private void cancelJobLocalMode(JobID jobID){ - FiniteDuration timeout = AkkaUtils.getTimeout(this.localFlinkCluster.configuration()); - ActorGateway leader = this.localFlinkCluster.getLeaderGateway(timeout); - leader.ask(new JobManagerMessages.CancelJob(jobID), timeout); } @Override - public FormType getFormType() { + public FormType getFormType() throws InterpreterException { return FormType.NATIVE; } @Override - public int getProgress(InterpreterContext context) { + public int getProgress(InterpreterContext context) throws InterpreterException { return 0; } @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return new LinkedList<>(); + public List<InterpreterCompletion> completion(String buf, + int cursor, + InterpreterContext interpreterContext) + throws InterpreterException { + return innerIntp.completion(buf, cursor, interpreterContext); } - private void startFlinkMiniCluster() { - localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false); - - try { - localFlinkCluster.start(true); - } catch (Exception e){ - throw new RuntimeException("Could not start Flink mini cluster.", e); - } + FlinkScalaInterpreter getInnerScalaInterpreter() { + return this.innerIntp; } - private void stopFlinkMiniCluster() { - if (localFlinkCluster != null) { - localFlinkCluster.stop(); - localFlinkCluster = null; - } + ExecutionEnvironment getExecutionEnviroment() { + return this.innerIntp.getExecutionEnviroment(); } - static final String toString(Object o) { - return (o instanceof String) ? (String) o : ""; + FlinkZeppelinContext getZeppelinContext() { + return this.z; } - + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java new file mode 100644 index 0000000..1ac3547 --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; + +import java.util.Properties; + +public class FlinkSQLInterpreter extends Interpreter { + + private FlinkSQLScalaInterpreter sqlScalaInterpreter; + + public FlinkSQLInterpreter(Properties properties) { + super(properties); + } + + + @Override + public void open() throws InterpreterException { + FlinkInterpreter flinkInterpreter = + getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class); + FlinkZeppelinContext z = flinkInterpreter.getZeppelinContext(); + int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000")); + this.sqlScalaInterpreter = new FlinkSQLScalaInterpreter( + flinkInterpreter.getInnerScalaInterpreter(), z, maxRow); + } + + @Override + public void close() throws InterpreterException { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { + return sqlScalaInterpreter.interpret(st, context); + } + + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + + } + + @Override + public FormType getFormType() throws InterpreterException { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/src/main/resources/interpreter-setting.json index f1a04bf..1463e3d 100644 --- a/flink/src/main/resources/interpreter-setting.json +++ b/flink/src/main/resources/interpreter-setting.json @@ -23,5 +23,24 @@ "language": "scala", "editOnDblClick": false } + }, + + { + "group": "flink", + "name": "sql", + "className": "org.apache.zeppelin.flink.FlinkSQLInterpreter", + "properties": { + "zeppelin.flink.maxResult": { + "envName": "zeppelin.flink.maxResult", + "propertyName": "zeppelin.flink.maxResult", + "defaultValue": "1000", + "description": "max number of row returned by sql interpreter.", + "type": "number" + } + }, + "editor": { + "language": "sql", + "editOnDblClick": false + } } ] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala new file mode 100644 index 0000000..1694a44 --- /dev/null +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink + +import org.apache.flink.table.api.Table +import org.apache.flink.table.api.scala.BatchTableEnvironment +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult} + +class FlinkSQLScalaInterpreter(scalaInterpreter: FlinkScalaInterpreter, + z: FlinkZeppelinContext, + maxRow: Int) { + + private var btenv: BatchTableEnvironment = scalaInterpreter.getBatchTableEnviroment() + + def interpret(code: String, context: InterpreterContext): InterpreterResult = { + try { + val table: Table = this.btenv.sql(code) + val result = z.showData(table) + return new InterpreterResult(InterpreterResult.Code.SUCCESS, result) + } catch { + case e: Exception => + return new InterpreterResult(InterpreterResult.Code.ERROR, + "Fail to fetch result: " + e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala new file mode 100644 index 0000000..0653c2a --- /dev/null +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink + +import java.io.BufferedReader +import java.nio.file.Files +import java.util.Properties + +import org.apache.flink.api.scala.FlinkShell._ +import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop} +import org.apache.flink.client.program.ClusterClient +import org.apache.flink.configuration.GlobalConfiguration +import org.apache.flink.runtime.minicluster.{MiniCluster, StandaloneMiniCluster} +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.Completion.ScalaCompleter +import scala.tools.nsc.interpreter.{JPrintWriter, SimpleReader} + +class FlinkScalaInterpreter(val properties: Properties) { + + lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) + + private var flinkILoop: FlinkILoop = _ + private var cluster: Option[Either[Either[StandaloneMiniCluster, MiniCluster], + ClusterClient[_]]] = _ + private var scalaCompleter: ScalaCompleter = _ + private val interpreterOutput = new InterpreterOutputStream(LOGGER) + + private var benv: ExecutionEnvironment = _ + private var senv: StreamExecutionEnvironment = _ + private var btenv: BatchTableEnvironment = _ + private var stenv: StreamTableEnvironment = _ + private var z: FlinkZeppelinContext = _ + + def open(): Unit = { + var config = Config(executionMode = ExecutionMode.withName( + properties.getProperty("flink.execution.mode", "LOCAL").toUpperCase)) + val containerNum = Integer.parseInt(properties.getProperty("flink.yarn.num_container", "1")) + config = config.copy(yarnConfig = + Some(ensureYarnConfig(config).copy(containers = Some(containerNum)))) + val configuration = GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")) + val replOut = new JPrintWriter(interpreterOutput, true) + + val (iLoop, cluster) = try { + val (host, port, cluster) = fetchConnectionInfo(configuration, config) + val conf = cluster match { + case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration + case Some(Left(Right(_))) => configuration + case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration + case None => configuration + } + LOGGER.info(s"\nConnecting to Flink cluster (host: $host, port: $port).\n") + val repl = new FlinkILoop(host, port, conf, config.externalJars, None, replOut) + + (repl, cluster) + } catch { + case e: IllegalArgumentException => + println(s"Error: ${e.getMessage}") + sys.exit() + } + + this.flinkILoop = iLoop + this.cluster = cluster + val settings = new Settings() + settings.usejavacp.value = true + settings.Yreplsync.value = true + + val outputDir = Files.createTempDirectory("flink-repl"); + val interpArguments = List( + "-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.toFile.getAbsolutePath}" + ) + settings.processArguments(interpArguments, true) + + flinkILoop.settings = settings + flinkILoop.createInterpreter() + + val in0 = getField(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$in0") + .asInstanceOf[Option[BufferedReader]] + val reader = in0.fold(flinkILoop.chooseReader(settings))(r => + SimpleReader(r, replOut, interactive = true)) + + flinkILoop.in = reader + flinkILoop.initializeSynchronous() + callMethod(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$loopPostInit") + this.scalaCompleter = reader.completion.completer() + + this.benv = flinkILoop.scalaBenv + this.senv = flinkILoop.scalaSenv + this.btenv = TableEnvironment.getTableEnvironment(this.benv) + this.stenv = TableEnvironment.getTableEnvironment(this.senv) + bind("btenv", btenv.getClass.getCanonicalName, btenv, List("@transient")) + bind("stenv", stenv.getClass.getCanonicalName, stenv, List("@transient")) + + if (java.lang.Boolean.parseBoolean( + properties.getProperty("zeppelin.flink.disableSysoutLogging", "true"))) { + this.benv.getConfig.disableSysoutLogging() + this.senv.getConfig.disableSysoutLogging() + } + } + + // for use in java side + protected def bind(name: String, + tpe: String, + value: Object, + modifier: java.util.List[String]): Unit = { + flinkILoop.beQuietDuring { + flinkILoop.bind(name, tpe, value, modifier.asScala.toList) + } + } + + protected def bind(name: String, + tpe: String, + value: Object, + modifier: List[String]): Unit = { + flinkILoop.beQuietDuring { + flinkILoop.bind(name, tpe, value, modifier) + } + } + + protected def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + val completions = scalaCompleter.complete(buf, cursor).candidates + .map(e => new InterpreterCompletion(e, e, null)) + scala.collection.JavaConversions.seqAsJavaList(completions) + } + + protected def callMethod(obj: Object, name: String): Object = { + callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object]) + } + + protected def callMethod(obj: Object, name: String, + parameterTypes: Array[Class[_]], + parameters: Array[Object]): Object = { + val method = obj.getClass.getMethod(name, parameterTypes: _ *) + method.setAccessible(true) + method.invoke(obj, parameters: _ *) + } + + + protected def getField(obj: Object, name: String): Object = { + val field = obj.getClass.getField(name) + field.setAccessible(true) + field.get(obj) + } + + def interpret(code: String, context: InterpreterContext): InterpreterResult = { + + val originalOut = System.out + + def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { + Console.withOut(interpreterOutput) { + System.setOut(Console.out) + interpreterOutput.setInterpreterOutput(context.out) + interpreterOutput.ignoreLeadingNewLinesFromScalaReporter() + context.out.clear() + + val status = flinkILoop.interpret(code) match { + case scala.tools.nsc.interpreter.IR.Success => + scala.tools.nsc.interpreter.IR.Success + case scala.tools.nsc.interpreter.IR.Error => + scala.tools.nsc.interpreter.IR.Error + case scala.tools.nsc.interpreter.IR.Incomplete => + // add print("") at the end in case the last line is comment which lead to INCOMPLETE + flinkILoop.interpret(code + "\nprint(\"\")") + } + context.out.flush() + status + } + } + // reset the java stdout + System.setOut(originalOut) + + val lastStatus = _interpret(code) match { + case scala.tools.nsc.interpreter.IR.Success => + InterpreterResult.Code.SUCCESS + case scala.tools.nsc.interpreter.IR.Error => + InterpreterResult.Code.ERROR + case scala.tools.nsc.interpreter.IR.Incomplete => + InterpreterResult.Code.INCOMPLETE + } + new InterpreterResult(lastStatus) + } + + def close(): Unit = { + if (flinkILoop != null) { + flinkILoop.close() + } + if (cluster != null) { + cluster match { + case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close() + case Some(Left(Right(newMiniCluster))) => newMiniCluster.close() + case Some(Right(yarnCluster)) => yarnCluster.shutdown() + case _ => + } + } + } + + def getExecutionEnviroment(): ExecutionEnvironment = this.benv + + def getStreamingExecutionEnviroment(): StreamExecutionEnvironment = this.senv + + def getBatchTableEnviroment(): BatchTableEnvironment = this.btenv + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala new file mode 100644 index 0000000..5246445 --- /dev/null +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink + +import java.util + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.table.api.Table +import org.apache.flink.table.api.scala.BatchTableEnvironment +import org.apache.flink.types.Row +import org.apache.zeppelin.annotation.ZeppelinApi +import org.apache.zeppelin.display.AngularObjectWatcher +import org.apache.zeppelin.display.ui.OptionInput.ParamOption +import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext, + InterpreterHookRegistry} + +import scala.collection.{JavaConversions, Seq} + + +/** + * ZeppelinContext for Flink + */ +class FlinkZeppelinContext(val btenv: BatchTableEnvironment, + val hooks2: InterpreterHookRegistry, + val maxResult2: Int) extends BaseZeppelinContext(hooks2, maxResult2) { + + private val interpreterClassMap = Map( + "flink" -> "org.apache.zeppelin.flink.FlinkInterpreter", + "sql" -> "org.apache.zeppelin.flink.FlinkSqlInterpreter" + ) + + private val supportedClasses = Seq(classOf[DataSet[_]]) + + override def getSupportedClasses: util.List[Class[_]] = + JavaConversions.seqAsJavaList(supportedClasses) + + override def getInterpreterClassMap: util.Map[String, String] = + JavaConversions.mapAsJavaMap(interpreterClassMap) + + override def showData(obj: Any): String = { + def showTable(table: Table): String = { + val columnNames: Array[String] = table.getSchema.getColumnNames + val dsRow: DataSet[Row] = btenv.toDataSet[Row](table) + val builder: StringBuilder = new StringBuilder("%table ") + builder.append(columnNames.mkString("\t")) + builder.append("\n") + val rows = dsRow.first(maxResult).collect() + for (row <- rows) { + var i = 0; + while (i < row.getArity) { + builder.append(row.getField(i)) + i += 1 + if (i != row.getArity) { + builder.append("\t"); + } + } + builder.append("\n") + } + // append %text at the end, otherwise the following output will be put in table as well. + builder.append("\n%text ") + builder.toString() + } + + if (obj.isInstanceOf[DataSet[_]]) { + val ds = obj.asInstanceOf[DataSet[_]] + val table = btenv.fromDataSet(ds) + showTable(table) + } else if (obj.isInstanceOf[Table]) { + showTable(obj.asInstanceOf[Table]) + } else { + obj.toString + } + } + + + @ZeppelinApi + def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options) + + @ZeppelinApi + def select(name: String, defaultValue: Any, options: Seq[(Any, String)]): Any = + select(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray) + + @ZeppelinApi + def checkbox(name: String, options: Seq[(AnyRef, String)]): Seq[Any] = { + val javaResult = checkbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)), + options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResult) + } + + @ZeppelinApi + def checkbox(name: String, defaultChecked: Seq[AnyRef], options: Seq[(Any, String)]): Seq[Any] = { + val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked) + val javaResult = checkbox(name, defaultCheckedList, + options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResult) + } + + @ZeppelinApi + def noteSelect(name: String, options: Seq[(Any, String)]): Any = noteSelect(name, "", options) + + @ZeppelinApi + def noteSelect(name: String, defaultValue: Any, options: Seq[(Any, String)]): AnyRef = + noteSelect(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray) + + @ZeppelinApi + def noteCheckbox(name: String, options: Seq[(AnyRef, String)]): Seq[AnyRef] = { + val javaResulst = noteCheckbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)), + options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResulst) + } + + @ZeppelinApi + def noteCheckbox(name: String, + defaultChecked: Seq[AnyRef], + options: Seq[(AnyRef, String)]): Seq[AnyRef] = { + val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked) + val javaResult = noteCheckbox(name, defaultCheckedList, + options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResult) + } + + @ZeppelinApi def angularWatch(name: String, func: (AnyRef, AnyRef) => Unit): Unit = { + angularWatch(name, interpreterContext.getNoteId, func) + } + + @deprecated def angularWatchGlobal(name: String, func: (AnyRef, AnyRef) => Unit): Unit = { + angularWatch(name, null, func) + } + + @ZeppelinApi def angularWatch(name: String, + func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { + angularWatch(name, interpreterContext.getNoteId, func) + } + + @deprecated def angularWatchGlobal(name: String, + func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { + angularWatch(name, null, func) + } + + private def angularWatch(name: String, noteId: String, func: (AnyRef, AnyRef) => Unit): Unit = { + val w = new AngularObjectWatcher(getInterpreterContext) { + override def watch(oldObject: Any, newObject: AnyRef, context: InterpreterContext): Unit = { + func(newObject, newObject) + } + } + angularWatch(name, noteId, w) + } + + private def angularWatch(name: String, noteId: String, + func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { + val w = new AngularObjectWatcher(getInterpreterContext) { + override def watch(oldObject: AnyRef, + newObject: AnyRef, + context: InterpreterContext): Unit = { + func(oldObject, newObject, context) + } + } + angularWatch(name, noteId, w) + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 128f567..0c42139 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,84 +16,240 @@ */ package org.apache.zeppelin.flink; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.ui.CheckBox; +import org.apache.zeppelin.display.ui.Select; +import org.apache.zeppelin.display.ui.TextBox; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Properties; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class FlinkInterpreterTest { - private static FlinkInterpreter flink; - private static InterpreterContext context; + private FlinkInterpreter interpreter; + private InterpreterContext context; + + // catch the streaming output in onAppend + private volatile String output = ""; + // catch the interpreter output in onUpdate + private List<InterpreterResultMessageOutput> messageOutput; - @BeforeClass - public static void setUp() { + @Before + public void setUp() throws InterpreterException { Properties p = new Properties(); - flink = new FlinkInterpreter(p); - flink.open(); + interpreter = new FlinkInterpreter(p); + InterpreterGroup intpGroup = new InterpreterGroup(); + interpreter.setInterpreterGroup(intpGroup); + interpreter.open(); context = InterpreterContext.builder().build(); + InterpreterContext.set(context); } - @AfterClass - public static void tearDown() { - flink.close(); - } - - @Test - public void testNextLineInvocation() { - assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret("\"123\"\n.toInt", context) - .code()); - } - - @Test - public void testNextLineComments() { - assertEquals(InterpreterResult.Code.SUCCESS, - flink.interpret("\"123\"\n/*comment here\n*/.toInt", context).code()); + @After + public void tearDown() throws InterpreterException { + interpreter.close(); } @Test - public void testNextLineCompanionObject() { - String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter " + - "{\n def apply(x: Long) = new Counter()\n}"; - assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret(code, context).code()); + public void testBasicScala() throws InterpreterException, IOException { + InterpreterResult result = interpreter.interpret("val a=\"hello world\"", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("a: String = hello world\n", output); + + result = interpreter.interpret("print(a)", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("hello world", output); + + // java stdout + result = interpreter.interpret("System.out.print(a)", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("hello world", output); + + // incomplete + result = interpreter.interpret("println(a", getInterpreterContext()); + assertEquals(InterpreterResult.Code.INCOMPLETE, result.code()); + + // syntax error + result = interpreter.interpret("println(b)", getInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(output.contains("not found: value b")); + + // multiple line + result = interpreter.interpret("\"123\".\ntoInt", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // single line comment + result = interpreter.interpret("/*comment here*/", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // multiple line comment + result = interpreter.interpret("/*line 1 \n line 2*/", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // test function + result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + result = interpreter.interpret("print(add(1,2))", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // companion object + result = interpreter.interpret("class Counter {\n " + + "var value: Long = 0} \n" + + "object Counter {\n def apply(x: Long) = new Counter()\n}", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // case class + result = interpreter.interpret( + "case class Bank(age:Integer, job:String, marital : String, education : String," + + " balance : Integer)\n", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // ZeppelinContext + context = getInterpreterContext(); + result = interpreter.interpret("val ds = benv.fromElements(1,2,3)\nz.show(ds)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, messageOutput.get(0).getType()); + assertEquals("f0\n" + + "1\n" + + "2\n" + + "3\n", messageOutput.get(0).toInterpreterResultMessage().getData()); + + context = getInterpreterContext(); + result = interpreter.interpret("z.input(\"name\", \"default_name\")", + context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, context.getGui().getForms().size()); + assertTrue(context.getGui().getForms().get("name") instanceof TextBox); + TextBox textBox = (TextBox) context.getGui().getForms().get("name"); + assertEquals("name", textBox.getName()); + assertEquals("default_name", textBox.getDefaultValue()); + + context = getInterpreterContext(); + result = interpreter.interpret("z.checkbox(\"checkbox_1\", " + + "Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, context.getGui().getForms().size()); + assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox); + CheckBox checkBox = (CheckBox) context.getGui().getForms().get("checkbox_1"); + assertEquals("checkbox_1", checkBox.getName()); + assertEquals(1, checkBox.getDefaultValue().length); + assertEquals("value_2", checkBox.getDefaultValue()[0]); + assertEquals(2, checkBox.getOptions().length); + assertEquals("value_1", checkBox.getOptions()[0].getValue()); + assertEquals("name_1", checkBox.getOptions()[0].getDisplayName()); + assertEquals("value_2", checkBox.getOptions()[1].getValue()); + assertEquals("name_2", checkBox.getOptions()[1].getDisplayName()); + + context = getInterpreterContext(); + result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), " + + "Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, context.getGui().getForms().size()); + assertTrue(context.getGui().getForms().get("select_1") instanceof Select); + Select select = (Select) context.getGui().getForms().get("select_1"); + assertEquals("select_1", select.getName()); + // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2', + // but it is List(value_2) + // assertEquals("value_2", select.getDefaultValue()); + assertEquals(2, select.getOptions().length); + assertEquals("value_1", select.getOptions()[0].getValue()); + assertEquals("name_1", select.getOptions()[0].getDisplayName()); + assertEquals("value_2", select.getOptions()[1].getValue()); + assertEquals("name_2", select.getOptions()[1].getDisplayName()); } @Test - public void testSimpleStatement() { - InterpreterResult result = flink.interpret("val a=1", context); - result = flink.interpret("print(a)", context); - assertEquals("1", result.message().get(0).getData()); + public void testCompletion() throws InterpreterException { + InterpreterResult result = interpreter.interpret("val a=\"hello world\"", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("a: String = hello world\n", output); + + List<InterpreterCompletion> completions = interpreter.completion("a.", 2, + getInterpreterContext()); + assertTrue(completions.size() > 0); } - @Test - public void testSimpleStatementWithSystemOutput() { - InterpreterResult result = flink.interpret("val a=1", context); - result = flink.interpret("System.out.print(a)", context); - assertEquals("1", result.message().get(0).getData()); - } + // Disable it for now as there's extra std output from flink shell. @Test - public void testWordCount() { - flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context); - flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" + - ".map { (_, 1) }.groupBy(0).sum(1)", context); - InterpreterResult result = flink.interpret("counts.print()", context); - assertEquals(Code.SUCCESS, result.code()); + public void testWordCount() throws InterpreterException, IOException { + interpreter.interpret("val text = benv.fromElements(\"To be or not to be\")", + getInterpreterContext()); + interpreter.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" + + ".map { (_, 1) }.groupBy(0).sum(1)", getInterpreterContext()); + InterpreterResult result = interpreter.interpret("counts.print()", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); String[] expectedCounts = {"(to,2)", "(be,2)", "(or,1)", "(not,1)"}; Arrays.sort(expectedCounts); - String[] counts = result.message().get(0).getData().split("\n"); + String[] counts = output.split("\n"); Arrays.sort(counts); assertArrayEquals(expectedCounts, counts); } + + private InterpreterContext getInterpreterContext() { + output = ""; + messageOutput = new ArrayList<>(); + InterpreterContext context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .setAngularObjectRegistry(new AngularObjectRegistry("flink", null)) + .build(); + context.out = new InterpreterOutput( + new InterpreterOutputListener() { + @Override + public void onUpdateAll(InterpreterOutput out) { + + } + + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { + try { + output = out.toInterpreterResultMessage().getData(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { + messageOutput.add(out); + } + }); + return context; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java new file mode 100644 index 0000000..6993540 --- /dev/null +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class FlinkSQLInterpreterTest { + + private FlinkInterpreter interpreter; + private FlinkSQLInterpreter sqlInterpreter; + private InterpreterContext context; + + // catch the streaming output in onAppend + private volatile String output = ""; + // catch the interpreter output in onUpdate + private InterpreterResultMessageOutput messageOutput; + + @Before + public void setUp() throws InterpreterException { + Properties p = new Properties(); + interpreter = new FlinkInterpreter(p); + sqlInterpreter = new FlinkSQLInterpreter(p); + InterpreterGroup intpGroup = new InterpreterGroup(); + interpreter.setInterpreterGroup(intpGroup); + sqlInterpreter.setInterpreterGroup(intpGroup); + intpGroup.addInterpreterToSession(interpreter, "session_1"); + intpGroup.addInterpreterToSession(sqlInterpreter, "session_1"); + + interpreter.open(); + sqlInterpreter.open(); + context = InterpreterContext.builder().build(); + } + + @Test + public void testSQLInterpreter() throws InterpreterException { + InterpreterResult result = interpreter.interpret( + "val ds = benv.fromElements((1, \"jeff\"), (2, \"andy\"))", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + result = interpreter.interpret("btenv.registerDataSet(\"table_1\", ds)", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + result = sqlInterpreter.interpret("select * from table_1", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("_1\t_2\n" + + "1\tjeff\n" + + "2\tandy\n", result.message().get(0).getData()); + } + + private InterpreterContext getInterpreterContext() { + output = ""; + InterpreterContext context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .setAngularObjectRegistry(new AngularObjectRegistry("flink", null)) + .build(); + context.out = new InterpreterOutput( + new InterpreterOutputListener() { + @Override + public void onUpdateAll(InterpreterOutput out) { + + } + + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { + try { + output = out.toInterpreterResultMessage().getData(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { + messageOutput = out; + } + }); + return context; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/resources/flink-conf.yaml ---------------------------------------------------------------------- diff --git a/flink/src/test/resources/flink-conf.yaml b/flink/src/test/resources/flink-conf.yaml new file mode 100644 index 0000000..1041d0f --- /dev/null +++ b/flink/src/test/resources/flink-conf.yaml @@ -0,0 +1,247 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +#============================================================================== +# Common +#============================================================================== + +# The external address of the host on which the JobManager runs and can be +# reached by the TaskManagers and any clients which want to connect. This setting +# is only used in Standalone mode and may be overwritten on the JobManager side +# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable. +# In high availability mode, if you use the bin/start-cluster.sh script and setup +# the conf/masters file, this will be taken care of automatically. Yarn/Mesos +# automatically configure the host name based on the hostname of the node where the +# JobManager runs. + +#mode: legacy + +jobmanager.rpc.address: localhost + +# The RPC port where the JobManager is reachable. + +jobmanager.rpc.port: 6123 + + +# The heap size for the JobManager JVM + +jobmanager.heap.mb: 1024 + + +# The heap size for the TaskManager JVM + +taskmanager.heap.mb: 1024 + + +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + +taskmanager.numberOfTaskSlots: 1 + +# The parallelism used for programs that did not specify and other parallelism. + +parallelism.default: 1 + +# The default file system scheme and authority. +# +# By default file paths without scheme are interpreted relative to the local +# root file system 'file:///'. Use this to override the default and interpret +# relative paths relative to a different file system, +# for example 'hdfs://mynamenode:12345' +# +# fs.default-scheme + +#============================================================================== +# High Availability +#============================================================================== + +# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# +# high-availability: zookeeper + +# The path where metadata for master recovery is persisted. While ZooKeeper stores +# the small ground truth for checkpoint and leader election, this location stores +# the larger objects, like persisted dataflow graphs. +# +# Must be a durable file system that is accessible from all nodes +# (like HDFS, S3, Ceph, nfs, ...) +# +# high-availability.storageDir: hdfs:///flink/ha/ + +# The list of ZooKeeper quorum peers that coordinate the high-availability +# setup. This must be a list of the form: +# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) +# +# high-availability.zookeeper.quorum: localhost:2181 + + +# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# The default value is "open" and it can be changed to "creator" if ZK security is enabled +# +# high-availability.zookeeper.client.acl: open + +#============================================================================== +# Fault tolerance and checkpointing +#============================================================================== + +# The backend that will be used to store operator state checkpoints if +# checkpointing is enabled. +# +# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the +# <class-name-of-factory>. +# +# state.backend: filesystem + +# Directory for checkpoints filesystem, when using any of the default bundled +# state backends. +# +# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints + +# Default target directory for savepoints, optional. +# +# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints + +# Flag to enable/disable incremental checkpoints for backends that +# support incremental checkpoints (like the RocksDB state backend). +# +# state.backend.incremental: false + +#============================================================================== +# Web Frontend +#============================================================================== + +# The address under which the web-based runtime monitor listens. +# +#jobmanager.web.address: 0.0.0.0 + +# The port under which the web-based runtime monitor listens. +# A value of -1 deactivates the web server. + +rest.port: 8081 + +# Flag to specify whether job submission is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#jobmanager.web.submit.enable: false + +#============================================================================== +# Advanced +#============================================================================== + +# Override the directories for temporary files. If not specified, the +# system-specific Java temporary directory (java.io.tmpdir property) is taken. +# +# For framework setups on Yarn or Mesos, Flink will automatically pick up the +# containers' temp directories without any need for configuration. +# +# Add a delimited list for multiple directories, using the system directory +# delimiter (colon ':' on unix) or a comma, e.g.: +# /data1/tmp:/data2/tmp:/data3/tmp +# +# Note: Each directory entry is read from and written to by a different I/O +# thread. You can include the same directory multiple times in order to create +# multiple I/O threads against that directory. This is for example relevant for +# high-throughput RAIDs. +# +# io.tmp.dirs: /tmp + +# Specify whether TaskManager's managed memory should be allocated when starting +# up (true) or when memory is requested. +# +# We recommend to set this value to 'true' only in setups for pure batch +# processing (DataSet API). Streaming setups currently do not use the TaskManager's +# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management, +# while the 'memory' and 'filesystem' backends explicitly keep data as objects +# to save on serialization cost. +# +# taskmanager.memory.preallocate: false + +# The classloading resolve order. Possible values are 'child-first' (Flink's default) +# and 'parent-first' (Java's default). +# +# Child first classloading allows users to use different dependency/library +# versions in their application than those in the classpath. Switching back +# to 'parent-first' may help with debugging dependency issues. +# +# classloader.resolve-order: child-first + +# The amount of memory going to the network stack. These numbers usually need +# no tuning. Adjusting them may be necessary in case of an "Insufficient number +# of network buffers" error. The default min is 64MB, teh default max is 1GB. +# +# taskmanager.network.memory.fraction: 0.1 +# taskmanager.network.memory.min: 67108864 +# taskmanager.network.memory.max: 1073741824 + +#============================================================================== +# Flink Cluster Security Configuration +#============================================================================== + +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL + +# The below configure how Kerberos credentials are provided. A keytab will be used instead of +# a ticket cache if the keytab path and principal are set. + +# security.kerberos.login.use-ticket-cache: true +# security.kerberos.login.keytab: /path/to/kerberos/keytab +# security.kerberos.login.principal: flink-user + +# The configuration below defines which JAAS login contexts + +# security.kerberos.login.contexts: Client,KafkaClient + +#============================================================================== +# ZK Security Configuration +#============================================================================== + +# Below configurations are applicable if ZK ensemble is configured for security + +# Override below configuration to provide custom ZK service name if configured +# zookeeper.sasl.service-name: zookeeper + +# The configuration below must match one of the values set in "security.kerberos.login.contexts" +# zookeeper.sasl.login-context-name: Client + +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) + +# Directory to upload completed jobs to. Add this directory to the list of +# monitored directories of the HistoryServer as well (see below). +#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ + +# The address under which the web-based HistoryServer listens. +#historyserver.web.address: 0.0.0.0 + +# The port under which the web-based HistoryServer listens. +#historyserver.web.port: 8082 + +# Comma separated list of directories to monitor for completed jobs. +#historyserver.archive.fs.dir: hdfs:///completed-jobs/ + +# Interval in milliseconds for refreshing the monitored directories. +#historyserver.archive.fs.refresh-interval: 10000 + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink/src/test/resources/log4j.properties b/flink/src/test/resources/log4j.properties new file mode 100644 index 0000000..65b6d36 --- /dev/null +++ b/flink/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger = INFO, stdout + +log4j.appender.stdout = org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout = org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n + +log4j.logger.org.apache.zeppelin.flink=WARN http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index 10ab354..9c7a0b2 100644 --- a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -94,6 +94,12 @@ public class StandardInterpreterLauncher extends InterpreterLauncher { if (RemoteInterpreterUtils.isEnvString((String) key)) { env.put((String) key, context.getProperties().getProperty((String) key)); } + // TODO(zjffdu) move this to FlinkInterpreterLauncher + if (key.toString().equals("FLINK_HOME")) { + String flinkHome = context.getProperties().get(key).toString(); + env.put("FLINK_CONF_DIR", flinkHome + "/conf"); + env.put("FLINK_LIB_DIR", flinkHome + "/lib"); + } } return env; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-server/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 9bc2a4c..4eaedb2 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -368,14 +368,6 @@ </dependency> <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-zengine</artifactId> - <version>${project.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.bitbucket.cowwoc</groupId> <artifactId>diff-match-patch</artifactId> <version>1.1</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java new file mode 100644 index 0000000..157b989 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java @@ -0,0 +1,136 @@ +package org.apache.zeppelin.interpreter; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for downloading spark. This is used for spark integration test. + * + */ +public class SparkDownloadUtils { + private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class); + + private static String downloadFolder = System.getProperty("user.home") + "/.cache/spark"; + + static { + try { + FileUtils.forceMkdir(new File(downloadFolder)); + } catch (IOException e) { + throw new RuntimeException("Fail to create downloadFolder: " + downloadFolder, e); + } + } + + + public static String downloadSpark(String version) { + File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6"); + if (targetSparkHomeFolder.exists()) { + LOGGER.info("Skip to download spark as it is already downloaded."); + return targetSparkHomeFolder.getAbsolutePath(); + } + // Try mirrors a few times until one succeeds + boolean downloaded = false; + for (int i = 0; i < 3; i++) { + try { + String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); + File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz"); + String downloadURL = preferredMirror + "/spark/spark-" + version + "/spark-" + version + "-bin-hadoop2.6.tgz"; + runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); + runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + downloaded = true; + break; + } catch (Exception e) { + LOGGER.warn("Failed to download Spark", e); + } + } + // fallback to use apache archive + // https://archive.apache.org/dist/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.6.tgz + if (!downloaded) { + File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz"); + String downloadURL = + "https://archive.apache.org/dist/spark/spark-" + + version + + "/spark-" + + version + + "-bin-hadoop2.6.tgz"; + try { + runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); + runShellCommand( + new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + } catch (Exception e) { + throw new RuntimeException("Fail to download spark " + version, e); + } + } + return targetSparkHomeFolder.getAbsolutePath(); + } + + public static String downloadFlink(String version) { + File targetFlinkHomeFolder = new File(downloadFolder + "/flink-" + version); + if (targetFlinkHomeFolder.exists()) { + LOGGER.info("Skip to download flink as it is already downloaded."); + return targetFlinkHomeFolder.getAbsolutePath(); + } + // Try mirrors a few times until one succeeds + for (int i = 0; i < 3; i++) { + try { + String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); + File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); + String downloadURL = preferredMirror + "/flink/flink-" + version + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"; + runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); + runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + break; + } catch (Exception e) { + LOGGER.warn("Failed to download Flink", e); + } + } + return targetFlinkHomeFolder.getAbsolutePath(); + } + + private static void runShellCommand(String[] commands) throws IOException, InterruptedException { + LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " ")); + Process process = Runtime.getRuntime().exec(commands); + StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream()); + StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream()); + errorGobbler.start(); + outputGobbler.start(); + if (process.waitFor() != 0) { + throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " ")); + } + LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " ")); + } + + private static class StreamGobbler extends Thread { + InputStream is; + + // reads everything from is until empty. + StreamGobbler(InputStream is) { + this.is = is; + } + + public void run() { + try { + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + String line = null; + long startTime = System.currentTimeMillis(); + while ( (line = br.readLine()) != null) { + // logging per 5 seconds + if ((System.currentTimeMillis() - startTime) > 5000) { + LOGGER.info(line); + startTime = System.currentTimeMillis(); + } + } + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index b64b15b..6f9f81f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -225,7 +225,9 @@ public class RemoteInterpreter extends Interpreter { remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { }.getType()); context.getConfig().clear(); - context.getConfig().putAll(remoteConfig); + if (remoteConfig != null) { + context.getConfig().putAll(remoteConfig); + } GUI currentGUI = context.getGui(); GUI currentNoteGUI = context.getNoteGui(); if (form == FormType.NATIVE) {