This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.10 by this push: new 76fe06b [ZEPPELIN-5352] Support flink in k8s mode 76fe06b is described below commit 76fe06b5ce09ffbf934b5849fa5f771813ee9967 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Oct 8 20:57:29 2021 +0800 [ZEPPELIN-5352] Support flink in k8s mode ### What is this PR for? Flink support k8s native mode from flink 1.11, this PR is to support k8s-application mode. The implementation is very similar as yarn-application mode. ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5352 ### How should this be tested? * Ci pass and manually tested ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4116 from zjffdu/ZEPPELIN-5352 and squashes the following commits: 983a3793da [Jeff Zhang] address comment 11d0c60af1 [Jeff Zhang] [ZEPPELIN-5352]. Support flink in k8s mode (cherry picked from commit 28bc9722700905777f2304a843a126c302d2023e) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- bin/interpreter.sh | 6 +- ...va => ApplicationModeExecutionEnvironment.java} | 6 +- ....java => ApplicationModeStreamEnvironment.java} | 8 +-- .../zeppelin/flink/FlinkScalaInterpreter.scala | 32 ++++++++- .../zeppelin/flink/internal/FlinkILoop.scala | 9 ++- .../zeppelin/flink/internal/FlinkShell.scala | 20 +++++- .../zeppelin/integration/FlinkIntegrationTest.java | 2 +- .../remote/RemoteInterpreterServer.java | 19 +++-- .../launcher/FlinkInterpreterLauncher.java | 83 +++++++++++++++++----- .../zeppelin/interpreter/InterpreterSetting.java | 4 ++ .../remote/ExecRemoteInterpreterProcess.java | 9 ++- 11 files changed, 148 insertions(+), 50 deletions(-) diff --git a/bin/interpreter.sh b/bin/interpreter.sh index c75a299..6d9c048 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -304,9 +304,9 @@ if [[ -n "${SPARK_SUBMIT}" ]]; then else INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}") fi -elif [[ "${ZEPPELIN_FLINK_YARN_APPLICATION}" == "true" ]]; then - IFS='|' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}" - INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "yarn-application" "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}") +elif [[ -n "${ZEPPELIN_FLINK_APPLICATION_MODE}" ]]; then + IFS='|' read -r -a ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY <<< "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF}" + INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "${ZEPPELIN_FLINK_APPLICATION_MODE}" "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}") else IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}" IFS=' ' read -r -a ZEPPELIN_INTP_MEM_ARRAY <<< "${ZEPPELIN_INTP_MEM}" diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java similarity index 94% rename from flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java index 2cfd0e3..33afa03 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java @@ -37,15 +37,15 @@ import static org.apache.flink.util.Preconditions.checkState; /** - * ExecutionEnvironment used for yarn application mode. + * ExecutionEnvironment used for application mode. * Need to add jars of scala shell before submitting jobs. */ -public class YarnApplicationExecutionEnvironment extends ExecutionEnvironment { +public class ApplicationModeExecutionEnvironment extends ExecutionEnvironment { private FlinkILoop flinkILoop; private FlinkScalaInterpreter flinkScalaInterpreter; - public YarnApplicationExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader, + public ApplicationModeExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userClassloader, FlinkILoop flinkILoop, diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java similarity index 93% rename from flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java index 7f2fc92..b86e556 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java @@ -40,17 +40,17 @@ import static org.apache.flink.util.Preconditions.checkState; /** - * StreamExecutionEnvironment used for yarn application mode. + * StreamExecutionEnvironment used for application mode. * Need to add jars of scala shell before submitting jobs. */ -public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment { +public class ApplicationModeStreamEnvironment extends StreamExecutionEnvironment { - private static final Logger LOGGER = LoggerFactory.getLogger(YarnApplicationStreamEnvironment.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationModeStreamEnvironment.class); private FlinkILoop flinkILoop; private FlinkScalaInterpreter flinkScalaInterpreter; - public YarnApplicationStreamEnvironment(PipelineExecutorServiceLoader executorServiceLoader, + public ApplicationModeStreamEnvironment(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userClassloader, FlinkILoop flinkILoop, diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 0918084..12051dd 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -18,7 +18,7 @@ package org.apache.zeppelin.flink -import java.io.File +import java.io.{File, IOException} import java.net.{URL, URLClassLoader} import java.nio.file.Files import java.util.Properties @@ -45,6 +45,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl import org.apache.flink.table.module.hive.HiveModule import org.apache.flink.yarn.cli.FlinkYarnSessionCli import org.apache.zeppelin.dep.DependencyResolver +import org.apache.zeppelin.flink.internal.FlinkShell import org.apache.zeppelin.flink.internal.FlinkShell._ import org.apache.zeppelin.flink.internal.FlinkILoop import org.apache.zeppelin.interpreter.Interpreter.FormType @@ -145,7 +146,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, // load udf jar this.userUdfJars.foreach(jar => loadUDFJar(jar)) - if (mode == ExecutionMode.YARN_APPLICATION) { + if (ExecutionMode.isApplicationMode(mode)) { // have to call senv.execute method before running any user code, otherwise yarn application mode // will cause ClassNotFound issue. Needs to do more investigation. TODO(zjffdu) val initCode = @@ -185,7 +186,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, properties.getProperty("flink.execution.mode", "LOCAL") .replace("-", "_") .toUpperCase) - if (mode == ExecutionMode.YARN_APPLICATION) { + if (ExecutionMode.isYarnAppicationMode(mode)) { if (flinkVersion.isFlink110) { throw new Exception("yarn-application mode is only supported after Flink 1.11") } @@ -195,6 +196,17 @@ abstract class FlinkScalaInterpreter(val properties: Properties, flinkConfDir = workingDirectory hiveConfDir = workingDirectory } + if (ExecutionMode.isK8sApplicationMode(mode)) { + if (flinkVersion.isFlink110) { + throw new Exception("application mode is only supported after Flink 1.11") + } + // use current pod working directory as FLINK_HOME + val workingDirectory = new File(".").getAbsolutePath + flinkHome = workingDirectory + flinkConfDir = workingDirectory + "/conf" + hiveConfDir = workingDirectory + "/conf" + } + LOGGER.info("FLINK_HOME: " + flinkHome) LOGGER.info("FLINK_CONF_DIR: " + flinkConfDir) LOGGER.info("HADOOP_CONF_DIR: " + hadoopConfDir) @@ -234,7 +246,17 @@ abstract class FlinkScalaInterpreter(val properties: Properties, .copy(queue = Some(queue)))) this.userUdfJars = getUserUdfJars() + this.userJars = getUserJarsExceptUdfJars ++ this.userUdfJars + if (ExecutionMode.isK8sApplicationMode(mode)) { + var flinkAppJar = properties.getProperty("flink.app.jar") + if (flinkAppJar != null && flinkAppJar.startsWith("local://")) { + flinkAppJar = flinkAppJar.substring(8) + this.userJars = this.userJars :+ flinkAppJar + } else { + throw new IOException("flink.app.jar is not set or invalid, flink.app.jar: " + flinkAppJar) + } + } LOGGER.info("UserJars: " + userJars.mkString(",")) config = config.copy(externalJars = Some(userJars.toArray)) LOGGER.info("Config: " + config) @@ -317,6 +339,10 @@ abstract class FlinkScalaInterpreter(val properties: Properties, LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId) this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId) this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId) + } else if (ExecutionMode.isK8sApplicationMode(mode)) { + LOGGER.info("Use FlinkCluster in kubernetes-application mode") + this.jmWebUrl = "http://localhost:" + configuration.getInteger("rest.port", 8081) + this.displayedJMWebUrl = this.jmWebUrl } else { LOGGER.info("Use FlinkCluster in remote mode") this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala index 86abeb5..1be64ab 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala @@ -23,13 +23,12 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.AbstractID import java.io.{BufferedReader, File, FileOutputStream, IOException} -import java.net.URL import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment} import org.apache.flink.api.java.{ExecutionEnvironment => JExecutionEnvironment} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.core.execution.PipelineExecutorServiceLoader -import org.apache.zeppelin.flink.{FlinkScalaInterpreter, YarnApplicationExecutionEnvironment, YarnApplicationStreamEnvironment} +import org.apache.zeppelin.flink.{ApplicationModeExecutionEnvironment, ApplicationModeStreamEnvironment, FlinkScalaInterpreter} import FlinkShell.ExecutionMode import scala.tools.nsc.interpreter._ @@ -71,17 +70,17 @@ class FlinkILoop( scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment ) = { - if (mode == ExecutionMode.YARN_APPLICATION) { + if (ExecutionMode.isApplicationMode(mode)) { // For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created // by flink itself, we here just try get them via reflection and reconstruct them. - val scalaBenv = new ExecutionEnvironment(new YarnApplicationExecutionEnvironment( + val scalaBenv = new ExecutionEnvironment(new ApplicationModeExecutionEnvironment( getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader], getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration], getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader], this, flinkScalaInterpreter )) - val scalaSenv = new StreamExecutionEnvironment(new YarnApplicationStreamEnvironment( + val scalaSenv = new StreamExecutionEnvironment(new ApplicationModeStreamEnvironment( getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader], getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration], getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader], diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala index 437d188..ab0f299 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala @@ -18,7 +18,7 @@ package org.apache.zeppelin.flink.internal -import java.io.BufferedReader +import java.io._ import org.apache.flink.annotation.Internal import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser} @@ -30,6 +30,7 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati import org.apache.flink.yarn.executors.YarnSessionClusterExecutor import org.apache.zeppelin.flink.FlinkShims + import scala.collection.mutable.ArrayBuffer /** @@ -39,7 +40,19 @@ import scala.collection.mutable.ArrayBuffer object FlinkShell { object ExecutionMode extends Enumeration { - val UNDEFINED, LOCAL, REMOTE, YARN, YARN_APPLICATION = Value + val UNDEFINED, LOCAL, REMOTE, YARN, YARN_APPLICATION, KUBERNETES_APPLICATION = Value + + def isYarnAppicationMode(mode: ExecutionMode.Value): Boolean = { + mode == ExecutionMode.YARN_APPLICATION + } + + def isK8sApplicationMode(mode: ExecutionMode.Value): Boolean = { + mode == ExecutionMode.KUBERNETES_APPLICATION + } + + def isApplicationMode(mode: ExecutionMode.Value): Boolean = { + isYarnAppicationMode(mode) || isK8sApplicationMode(mode) + } } /** Configuration object */ @@ -84,9 +97,10 @@ object FlinkShell { case ExecutionMode.REMOTE => createRemoteConfig(config, flinkConfig) case ExecutionMode.YARN => createYarnClusterIfNeededAndGetConfig(config, flinkConfig, flinkShims) case ExecutionMode.YARN_APPLICATION => (flinkConfig, None) + case ExecutionMode.KUBERNETES_APPLICATION => (flinkConfig, None) case ExecutionMode.UNDEFINED => // Wrong input throw new IllegalArgumentException("please specify execution mode:\n" + - "[local | remote <host> <port> | yarn | yarn-application ]") + "[local | remote <host> <port> | yarn | yarn-application | kubernetes-application]") } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java index f27acbd..2dc14f8 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java @@ -153,7 +153,7 @@ public abstract class FlinkIntegrationTest { flinkInterpreterSetting.setProperty("FLINK_HOME", flinkHome); flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH")); flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); - flinkInterpreterSetting.setProperty("flink.execution.mode", "YARN"); + flinkInterpreterSetting.setProperty("flink.execution.mode", "yarn"); flinkInterpreterSetting.setProperty("zeppelin.flink.run.asLoginUser", "false"); testInterpreterBasics(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index c058dd6..aff8139 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -77,7 +77,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.nio.ByteBuffer; @@ -142,6 +141,12 @@ public class RemoteInterpreterServer extends Thread private ScheduledExecutorService resultCleanService = Executors.newSingleThreadScheduledExecutor(); private boolean isTest; + // Whether calling System.exit to force shutdown interpreter process. + // In Flink K8s application mode, RemoteInterpreterServer#main is called via reflection by flink framework. + // We should not call System.exit in this scenario when RemoteInterpreterServer is stopped, + // Otherwise flink will think flink job is exited abnormally and will try to restart this + // pod (RemoteInterpreterServer) + private boolean isForceShutdown = true; private ZeppelinConfiguration zConf; // cluster manager client @@ -321,7 +326,10 @@ public class RemoteInterpreterServer extends Thread * should be part of the next release and solve the problem. * We may have other threads that are not terminated successfully. */ - System.exit(0); + if (remoteInterpreterServer.isForceShutdown) { + LOGGER.info("Force shutting down"); + System.exit(0); + } } // Submit interpreter process metadata information to cluster metadata @@ -380,11 +388,14 @@ public class RemoteInterpreterServer extends Thread replClass.getConstructor(new Class[]{Properties.class}); Interpreter interpreter = constructor.newInstance(p); interpreter.setClassloaderUrls(new URL[]{}); - LOGGER.info("Instantiate interpreter {}", className); + interpreter.setInterpreterGroup(interpreterGroup); interpreter.setUserName(userName); interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId); + + this.isForceShutdown = Boolean.parseBoolean(properties.getOrDefault("zeppelin.interpreter.forceShutdown", "true")); + LOGGER.info("Instantiate interpreter {}, isForceShutdown: {}", className, isForceShutdown); } catch (Exception e) { LOGGER.error(e.getMessage(), e); throw new InterpreterRPCException("Fail to create interpreter, cause: " + e.toString()); @@ -700,7 +711,7 @@ public class RemoteInterpreterServer extends Thread } } - if (server.isServing()) { + if (server.isServing() && isForceShutdown) { LOGGER.info("Force shutting down"); System.exit(1); } diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java index 3ff4f56..83a1f24 100644 --- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.interpreter.launcher; import com.google.common.base.CharMatcher; +import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; @@ -26,18 +27,20 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.StringJoiner; +import java.util.Set; import java.util.stream.Collectors; public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreterLauncher.class); - + private static final Set<String> FLINK_EXECUTION_MODES = Sets.newHashSet( + "local", "remote", "yarn", "yarn-application", "kubernetes-application"); public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { super(zConf, recoveryStorage); @@ -48,8 +51,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { throws IOException { Map<String, String> envs = super.buildEnvFromProperties(context); - String flinkHome = updateEnvsForFlinkHome(envs, context); - + // update FLINK related environment variables + String flinkHome = getFlinkHome(context); if (!envs.containsKey("FLINK_CONF_DIR")) { envs.put("FLINK_CONF_DIR", flinkHome + "/conf"); } @@ -59,14 +62,29 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { normalizeConfiguration(context); String flinkExecutionMode = context.getProperties().getProperty("flink.execution.mode"); - // yarn application mode specific logic - if ("yarn-application".equalsIgnoreCase(flinkExecutionMode)) { - updateEnvsForYarnApplicationMode(envs, context); + if (!FLINK_EXECUTION_MODES.contains(flinkExecutionMode)) { + throw new IOException("Not valid flink.execution.mode: " + + flinkExecutionMode + ", valid modes ares: " + + FLINK_EXECUTION_MODES.stream().collect(Collectors.joining(", "))); + } + // application mode specific logic + if (isApplicationMode(flinkExecutionMode)) { + updateEnvsForApplicationMode(flinkExecutionMode, envs, context); } - String flinkAppJar = chooseFlinkAppJar(flinkHome); - LOGGER.info("Choose FLINK_APP_JAR: {}", flinkAppJar); - envs.put("FLINK_APP_JAR", flinkAppJar); + if (isK8sApplicationMode(flinkExecutionMode)) { + String flinkAppJar = context.getProperties().getProperty("flink.app.jar"); + if (StringUtils.isBlank(flinkAppJar)) { + throw new IOException("flink.app.jar is not specified for kubernetes-application mode"); + } + envs.put("FLINK_APP_JAR", flinkAppJar); + LOGGER.info("K8s application's FLINK_APP_JAR : " + flinkAppJar); + context.getProperties().put("zeppelin.interpreter.forceShutdown", "false"); + } else { + String flinkAppJar = chooseFlinkAppJar(flinkHome); + LOGGER.info("Choose FLINK_APP_JAR for non k8s-application mode: {}", flinkAppJar); + envs.put("FLINK_APP_JAR", flinkAppJar); + } if ("yarn".equalsIgnoreCase(flinkExecutionMode) || "yarn-application".equalsIgnoreCase(flinkExecutionMode)) { @@ -148,8 +166,28 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { return flinkScalaJars.get(0).getAbsolutePath(); } - private String updateEnvsForFlinkHome(Map<String, String> envs, - InterpreterLaunchContext context) throws IOException { + private boolean isApplicationMode(String mode) { + return isYarnApplicationMode(mode) || isK8sApplicationMode(mode); + } + + private boolean isYarnApplicationMode(String mode) { + return "yarn-application".equals(mode); + } + + private boolean isK8sApplicationMode(String mode) { + return "kubernetes-application".equals(mode); + } + + /** + * Get FLINK_HOME in the following orders: + * 1. FLINK_HOME in interpreter setting + * 2. FLINK_HOME in system environment variables. + * + * @param context + * @return + * @throws IOException + */ + private String getFlinkHome(InterpreterLaunchContext context) throws IOException { String flinkHome = context.getProperties().getProperty("FLINK_HOME"); if (StringUtils.isBlank(flinkHome)) { flinkHome = System.getenv("FLINK_HOME"); @@ -168,11 +206,11 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { return flinkHome; } - private void updateEnvsForYarnApplicationMode(Map<String, String> envs, - InterpreterLaunchContext context) - throws IOException { - - envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true"); + private void updateEnvsForApplicationMode(String mode, + Map<String, String> envs, + InterpreterLaunchContext context) throws IOException { + // ZEPPELIN_FLINK_APPLICATION_MODE is used in interpreter.sh + envs.put("ZEPPELIN_FLINK_APPLICATION_MODE", mode); StringJoiner flinkConfStringJoiner = new StringJoiner("|"); // set yarn.ship-files @@ -190,7 +228,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { flinkConfStringJoiner.add("yarn.application.name=" + yarnAppName); } - // add other yarn and python configuration. + // add other configuration for both k8s and yarn for (Map.Entry<Object, Object> entry : context.getProperties().entrySet()) { String key = entry.getKey().toString(); String value = entry.getValue().toString(); @@ -205,9 +243,16 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { } } } - envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkConfStringJoiner.toString()); + envs.put("ZEPPELIN_FLINK_APPLICATION_MODE_CONF", flinkConfStringJoiner.toString()); } + /** + * Used in yarn-application mode. + * + * @param context + * @return + * @throws IOException + */ private List<String> getYarnShipFiles(InterpreterLaunchContext context) throws IOException { // Extract yarn.ship-files, add hive-site.xml automatically if hive is enabled // and HIVE_CONF_DIR is specified diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 3b0728e..bff9273 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -762,6 +762,10 @@ public class InterpreterSetting { public String getLauncherPlugin(Properties properties) { if (isRunningOnKubernetes()) { + if (group.equals("flink")) { + // Flink has its own implementation of k8s mode. + return "FlinkInterpreterLauncher"; + } return "K8sStandardInterpreterLauncher"; } else if (isRunningOnCluster()) { return InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java index f7b85a2..3b2142d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java @@ -229,9 +229,9 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces synchronized (this) { notifyAll(); } - } else if (isFlinkYarnApplicationMode() && exitValue == 0) { + } else if (isFlinkApplicationMode() && exitValue == 0) { // Don't update transition state when flink launcher process exist - // in yarn application mode. + // in flink application mode. synchronized (this) { notifyAll(); } @@ -252,9 +252,8 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", "false")); } - private boolean isFlinkYarnApplicationMode() { - return Boolean.parseBoolean( - getEnv().getOrDefault("ZEPPELIN_FLINK_YARN_APPLICATION", "false")); + private boolean isFlinkApplicationMode() { + return getEnv().containsKey("ZEPPELIN_FLINK_APPLICATION_MODE"); } @Override