This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new eb8c4d7  [ZEPPELIN-4353] Move flink specific logic into 
FlinkInterpreterLauncher
eb8c4d7 is described below

commit eb8c4d7354ab29ac364a89d39970ebab862c9edd
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Sep 26 17:18:05 2019 +0800

    [ZEPPELIN-4353] Move flink specific logic into FlinkInterpreterLauncher
    
    ### What is this PR for?
    
    This PR is a refactoring PR which introduce a new plugin module 
`launcher/flink` for launching flink interpreter.
    
    ### What type of PR is it?
    [ Improvement  | Refactoring]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4353
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3475 from zjffdu/ZEPPELIN-4353 and squashes the following commits:
    
    eb85cf0dc [Jeff Zhang] [ZEPPELIN-4353] Move flink specific logic into 
FlinkInterpreterLauncher
---
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  2 +
 zeppelin-plugins/launcher/flink/pom.xml            | 82 ++++++++++++++++++++++
 .../launcher/FlinkInterpreterLauncher.java         | 54 ++++++++++++++
 zeppelin-plugins/pom.xml                           |  1 +
 .../zeppelin/interpreter/InterpreterSetting.java   |  2 +
 .../launcher/StandardInterpreterLauncher.java      |  7 --
 6 files changed, 141 insertions(+), 7 deletions(-)

diff --git 
a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala 
b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 8c3946e..443f71d 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -222,10 +222,12 @@ class FlinkScalaInterpreter(val properties: Properties) {
     setAsContext()
     if (getPlanner == "flink") {
       // flink planner
+      LOGGER.info("Use flink planner")
       this.btenv = flinkILoop.scalaBTEnv
       this.stenv = flinkILoop.scalaSTEnv
     } else {
       // blink planner
+      LOGGER.info("Use blink planner")
       this.btEnvSetting = 
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
       this.btenv = TableEnvironment.create(this.btEnvSetting)
       flinkILoop.intp.bind("btenv", this.btenv)
diff --git a/zeppelin-plugins/launcher/flink/pom.xml 
b/zeppelin-plugins/launcher/flink/pom.xml
new file mode 100644
index 0000000..9aeabf2
--- /dev/null
+++ b/zeppelin-plugins/launcher/flink/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zengine-plugins-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../../../zeppelin-plugins</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>launcher-flink</artifactId>
+  <packaging>jar</packaging>
+  <version>0.9.0-SNAPSHOT</version>
+  <name>Zeppelin: Plugin Flink Launcher</name>
+  <description>Launcher implementation to run flink</description>
+
+  <properties>
+    <plugin.name>Launcher/FlinkInterpreterLauncher</plugin.name>
+  </properties>
+
+  <build>
+    <testResources>
+      <testResource>
+        <directory>${project.basedir}/src/test/resources</directory>
+      </testResource>
+      <testResource>
+        <directory>${project.basedir}/src/main/resources</directory>
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <skip>false</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <includes>
+          <include>**/*.*</include>
+        </includes>
+      </resource>
+    </resources>
+  </build>
+</project>
diff --git 
a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
new file mode 100644
index 0000000..c714b04
--- /dev/null
+++ 
b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
+
+  public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage) {
+    super(zConf, recoveryStorage);
+  }
+
+  @Override
+  public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext 
context)
+          throws IOException {
+    Map<String, String> envs = super.buildEnvFromProperties(context);
+    String flinkHome = context.getProperties().getProperty("FLINK_HOME");
+    if (flinkHome == null) {
+      throw new IOException("FLINK_HOME is not specified");
+    }
+    File flinkHomeFile = new File(flinkHome);
+    if (!flinkHomeFile.exists()) {
+      throw new IOException(String.format("FLINK_HOME {} doesn't exist", 
flinkHome));
+    }
+    if (flinkHomeFile.isFile()) {
+      throw new IOException(String.format("FLINK_HOME {} is a file, but should 
be directory",
+              flinkHome));
+    }
+    envs.put("FLINK_CONF_DIR", flinkHome + "/conf");
+    envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
+    envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+    return envs;
+  }
+}
diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml
index 0586c6d..fcd3957 100644
--- a/zeppelin-plugins/pom.xml
+++ b/zeppelin-plugins/pom.xml
@@ -47,6 +47,7 @@
         <module>launcher/k8s-standard</module>
         <module>launcher/cluster</module>
         <module>launcher/docker</module>
+        <module>launcher/flink</module>
     </modules>
 
     <dependencies>
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 4bac54c..a51bb87 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -690,6 +690,8 @@ public class InterpreterSetting {
     } else {
       if (group.equals("spark")) {
         return "SparkInterpreterLauncher";
+      } else if (group.equals("flink")) {
+        return "FlinkInterpreterLauncher";
       } else {
         return "StandardInterpreterLauncher";
       }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index 782ec5b..78eca55 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -94,13 +94,6 @@ public class StandardInterpreterLauncher extends 
InterpreterLauncher {
       if (RemoteInterpreterUtils.isEnvString((String) key)) {
         env.put((String) key, context.getProperties().getProperty((String) 
key));
       }
-      // TODO(zjffdu) move this to FlinkInterpreterLauncher
-      if (key.toString().equals("FLINK_HOME")) {
-        String flinkHome = context.getProperties().get(key).toString();
-        env.put("FLINK_CONF_DIR", flinkHome + "/conf");
-        env.put("FLINK_LIB_DIR", flinkHome + "/lib");
-        env.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
-      }
     }
     env.put("INTERPRETER_GROUP_ID", context.getInterpreterGroupId());
     return env;

Reply via email to