This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new f2eb3378e4 [ZEPPELIN-5897] Remove launcher properties and only use the current context properties (#4602) f2eb3378e4 is described below commit f2eb3378e4ca17f0834ea3e0b092f9fc57cd8966 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Fri May 26 13:36:25 2023 +0200 [ZEPPELIN-5897] Remove launcher properties and only use the current context properties (#4602) --- .../interpreter/launcher/InterpreterLauncher.java | 18 +++--- .../launcher/ClusterInterpreterLauncher.java | 15 ++--- .../launcher/DockerInterpreterLauncher.java | 6 +- .../launcher/FlinkInterpreterLauncher.java | 2 +- .../launcher/K8sStandardInterpreterLauncher.java | 9 ++- .../launcher/YarnInterpreterLauncher.java | 7 +-- .../launcher/SparkInterpreterLauncher.java | 68 ++++++++++++---------- .../launcher/StandardInterpreterLauncher.java | 5 +- 8 files changed, 61 insertions(+), 69 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index 2e7b0d6565..7190bea489 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -35,27 +35,23 @@ public abstract class InterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class); private static final String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`"; - protected ZeppelinConfiguration zConf; - protected Properties properties; - protected RecoveryStorage recoveryStorage; + protected final ZeppelinConfiguration zConf; + protected final RecoveryStorage recoveryStorage; - public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { + protected InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { this.zConf = zConf; this.recoveryStorage = recoveryStorage; } - public void setProperties(Properties props) { - this.properties = props; - } - /** * The timeout setting in interpreter setting take precedence over * that in zeppelin-site.xml * @return */ - protected int getConnectTimeout() { + protected int getConnectTimeout(InterpreterLaunchContext context) { int connectTimeout = (int) zConf.getTime(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + Properties properties = context.getProperties(); if (properties != null && properties.containsKey( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())) { connectTimeout = Integer.parseInt(properties.getProperty( @@ -64,8 +60,8 @@ public abstract class InterpreterLauncher { return connectTimeout; } - protected int getConnectPoolSize() { - return Integer.parseInt(properties.getProperty( + protected int getConnectPoolSize(InterpreterLaunchContext context) { + return Integer.parseInt(context.getProperties().getProperty( ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(), ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getIntValue() + "")); } diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index b34be06230..5681e42eb4 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -64,8 +64,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); this.context = context; - this.properties = context.getProperties(); - int connectTimeout = getConnectTimeout(); + int connectTimeout = getConnectTimeout(context); String intpGroupId = context.getInterpreterGroupId(); // connect exist Interpreter Process @@ -80,7 +79,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher context.getInterpreterSettingName(), context.getInterpreterGroupId(), connectTimeout, - getConnectPoolSize(), + getConnectPoolSize(context), context.getIntpEventServerHost(), context.getIntpEventServerPort(), intpTserverHost, @@ -156,7 +155,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher context.getInterpreterSettingName(), context.getInterpreterGroupId(), connectTimeout, - getConnectPoolSize(), + getConnectPoolSize(context), context.getIntpEventServerHost(), context.getIntpEventServerPort(), intpTserverHost, @@ -220,12 +219,10 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher private InterpreterClient createInterpreterProcess(InterpreterLaunchContext context) throws IOException { this.context = context; - this.properties = context.getProperties(); InterpreterClient intpProcess = null; if (isRunningOnDocker(zConf)) { DockerInterpreterLauncher dockerIntpLauncher = new DockerInterpreterLauncher(zConf, null); - dockerIntpLauncher.setProperties(context.getProperties()); intpProcess = dockerIntpLauncher.launch(context); } else { intpProcess = createClusterIntpProcess(); @@ -233,7 +230,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher // must first step start check interpreter thread ClusterInterpreterCheckThread intpCheckThread = new ClusterInterpreterCheckThread( - intpProcess, context.getInterpreterGroupId(), getConnectTimeout()); + intpProcess, context.getInterpreterGroupId(), getConnectTimeout(context)); intpCheckThread.start(); return intpProcess; @@ -246,8 +243,8 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher InterpreterRunner runner = context.getRunner(); String intpSetGroupName = context.getInterpreterSettingGroup(); String intpSetName = context.getInterpreterSettingName(); - int connectTimeout = getConnectTimeout(); - int connectionPoolSize = getConnectPoolSize(); + int connectTimeout = getConnectTimeout(context); + int connectionPoolSize = getConnectPoolSize(context); String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + context.getInterpreterSettingId(); diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java index 4865aedd35..91cc7a3a5c 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java @@ -41,8 +41,7 @@ public class DockerInterpreterLauncher extends InterpreterLauncher { public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); this.context = context; - this.properties = context.getProperties(); - int connectTimeout = getConnectTimeout(); + int connectTimeout = getConnectTimeout(context); if (connectTimeout < 200000) { // DockerInterpreterLauncher needs to pull the image and create the container, // it takes a long time, so the force is set to 200 seconds. @@ -60,7 +59,6 @@ public class DockerInterpreterLauncher extends InterpreterLauncher { } else { interpreterLauncher = new StandardInterpreterLauncher(zConf, recoveryStorage); } - interpreterLauncher.setProperties(context.getProperties()); Map<String, String> env = interpreterLauncher.buildEnvFromProperties(context); return new DockerInterpreterProcess( @@ -69,7 +67,7 @@ public class DockerInterpreterLauncher extends InterpreterLauncher { context.getInterpreterGroupId(), context.getInterpreterSettingGroup(), context.getInterpreterSettingName(), - properties, + context.getProperties(), env, context.getIntpEventServerHost(), context.getIntpEventServerPort(), diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java index b7e5f83a72..aa22ab6460 100644 --- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java @@ -78,7 +78,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { throw new IOException("flink.app.jar is not specified for kubernetes-application mode"); } envs.put("FLINK_APP_JAR", flinkAppJar); - LOGGER.info("K8s application's FLINK_APP_JAR : " + flinkAppJar); + LOGGER.info("K8s application's FLINK_APP_JAR : {}", flinkAppJar); context.getProperties().put("zeppelin.interpreter.forceShutdown", "false"); } else { String flinkAppJar = chooseFlinkAppJar(flinkHome); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java index d8f2b728e2..7a1ce48275 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java @@ -104,24 +104,23 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { @Override public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); - this.properties = context.getProperties(); return new K8sRemoteInterpreterProcess( client, - K8sUtils.getInterpreterNamespace(properties, zConf), + K8sUtils.getInterpreterNamespace(context.getProperties(), zConf), new File(zConf.getK8sTemplatesDir(), "interpreter"), zConf.getK8sContainerImage(), context.getInterpreterGroupId(), context.getInterpreterSettingGroup(), context.getInterpreterSettingName(), - properties, + context.getProperties(), buildEnvFromProperties(context), getZeppelinService(context), getZeppelinServiceRpcPort(context), zConf.getK8sPortForward(), zConf.getK8sSparkContainerImage(), - getConnectTimeout(), - getConnectPoolSize(), + getConnectTimeout(context), + getConnectPoolSize(context), isUserImpersonateForSparkInterpreter(context), zConf.getK8sTimeoutDuringPending()); } diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java index f07a1eef48..f0ff5ab73d 100644 --- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java @@ -41,14 +41,13 @@ public class YarnInterpreterLauncher extends InterpreterLauncher { @Override public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); - this.properties = context.getProperties(); return new YarnRemoteInterpreterProcess( context, - properties, + context.getProperties(), buildEnvFromProperties(context), - getConnectTimeout(), - getConnectPoolSize()); + getConnectTimeout(context), + getConnectPoolSize(context)); } protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 85b5fc6f2a..cc68512bf2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -62,10 +62,11 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) throws IOException { Map<String, String> env = super.buildEnvFromProperties(context); Properties sparkProperties = new Properties(); - String spMaster = getSparkMaster(); + String spMaster = getSparkMaster(context); if (spMaster != null) { sparkProperties.put(SPARK_MASTER_KEY, spMaster); } + Properties properties = context.getProperties(); for (String key : properties.stringPropertyNames()) { String propValue = properties.getProperty(key); if (RemoteInterpreterUtils.isEnvString(key) && !StringUtils.isBlank(propValue)) { @@ -91,18 +92,18 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { sparkProperties.setProperty("spark.app.name", context.getInterpreterGroupId()); } - setupPropertiesForPySpark(sparkProperties); - setupPropertiesForSparkR(sparkProperties); + setupPropertiesForPySpark(sparkProperties, context); + setupPropertiesForSparkR(sparkProperties, context); String condaEnvName = context.getProperties().getProperty("zeppelin.interpreter.conda.env.name"); if (StringUtils.isNotBlank(condaEnvName)) { - if (!isYarnCluster()) { + if (!isYarnCluster(context)) { throw new IOException("zeppelin.interpreter.conda.env.name only works for yarn-cluster mode"); } sparkProperties.setProperty("spark.pyspark.python", condaEnvName + "/bin/python"); } - if (isYarnCluster()) { + if (isYarnCluster(context)) { env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true"); sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false"); // Need to set `zeppelin.interpreter.forceShutdown` in interpreter properties directly @@ -116,7 +117,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { " to false if you want to use other modes."); } - if (isYarnMode() && getDeployMode().equals("cluster")) { + if (isYarnMode(context) && getDeployMode(context).equals("cluster")) { if (sparkProperties.containsKey("spark.files")) { sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," + zConf.getConfDir() + "/log4j_yarn_cluster.properties"); @@ -129,7 +130,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { String scalaVersion = null; try { - String sparkHome = getEnv("SPARK_HOME"); + String sparkHome = getEnv("SPARK_HOME", context); LOGGER.info("SPARK_HOME: {}", sparkHome); scalaVersion = detectSparkScalaVersion(sparkHome, env); LOGGER.info("Scala version for Spark: {}", scalaVersion); @@ -138,8 +139,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { throw new IOException("Fail to detect scala version, the reason is:"+ e.getMessage()); } - if (isYarnMode() - && getDeployMode().equals("cluster")) { + if (isYarnMode(context) + && getDeployMode(context).equals("cluster")) { try { List<String> additionalJars = new ArrayList<>(); Path localRepoPath = @@ -213,7 +214,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { // It is encouraged to set env in interpreter setting, but just for backward compatibility, // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting. for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) { - String envValue = getEnv(envName); + String envValue = getEnv(envName, context); if (!StringUtils.isBlank(envValue)) { env.put(envName, envValue); } @@ -235,9 +236,9 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { env.put("PYSPARK_PIN_THREAD", "true"); // ZEPPELIN_INTP_CLASSPATH - String sparkConfDir = getEnv("SPARK_CONF_DIR"); + String sparkConfDir = getEnv("SPARK_CONF_DIR", context); if (StringUtils.isBlank(sparkConfDir)) { - String sparkHome = getEnv("SPARK_HOME"); + String sparkHome = getEnv("SPARK_HOME", context); sparkConfDir = sparkHome + "/conf"; } Properties sparkDefaultProperties = new Properties(); @@ -252,7 +253,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { LOGGER.warn("spark-defaults.conf doesn't exist: {}", sparkDefaultFile.getAbsolutePath()); } - if (isYarnMode()) { + if (isYarnMode(context)) { boolean runAsLoginUser = Boolean.parseBoolean(context .getProperties() .getProperty("zeppelin.spark.run.asLoginUser", "true")); @@ -334,8 +335,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { * 2. zeppelin-env.sh * */ - private String getEnv(String envName) { - String env = properties.getProperty(envName); + private String getEnv(String envName, InterpreterLaunchContext context) { + String env = context.getProperties().getProperty(envName); if (StringUtils.isBlank(env)) { env = System.getenv(envName); } @@ -350,8 +351,9 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); } - private void setupPropertiesForPySpark(Properties sparkProperties) { - if (isYarnMode()) { + private void setupPropertiesForPySpark(Properties sparkProperties, + InterpreterLaunchContext context) { + if (isYarnMode(context)) { sparkProperties.setProperty("spark.yarn.isPython", "true"); } } @@ -366,12 +368,13 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } } - private void setupPropertiesForSparkR(Properties sparkProperties) { - if (isYarnMode()) { - String sparkHome = getEnv("SPARK_HOME"); + private void setupPropertiesForSparkR(Properties sparkProperties, + InterpreterLaunchContext context) { + if (isYarnMode(context)) { + String sparkHome = getEnv("SPARK_HOME", context); File sparkRBasePath = null; if (sparkHome == null) { - if (!getSparkMaster().startsWith("local")) { + if (!getSparkMaster(context).startsWith("local")) { throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + " interpreter setting"); @@ -402,9 +405,10 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { * 3. use local[*] * @return Spark Master string */ - private String getSparkMaster() { + private String getSparkMaster(InterpreterLaunchContext context) { if (!sparkMaster.isPresent()) { - String master = properties.getProperty(SPARK_MASTER_KEY); + Properties properties = context.getProperties(); + String master = context.getProperties().getProperty(SPARK_MASTER_KEY); if (master == null) { master = properties.getProperty("master"); if (master == null) { @@ -418,15 +422,15 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { return sparkMaster.get(); } - private String getDeployMode() { - if (getSparkMaster().equals("yarn-client")) { + private String getDeployMode(InterpreterLaunchContext context) { + if (getSparkMaster(context).equals("yarn-client")) { return "client"; - } else if (getSparkMaster().equals("yarn-cluster")) { + } else if (getSparkMaster(context).equals("yarn-cluster")) { return "cluster"; - } else if (getSparkMaster().startsWith("local")) { + } else if (getSparkMaster(context).startsWith("local")) { return "client"; } else { - String deployMode = properties.getProperty("spark.submit.deployMode"); + String deployMode = context.getProperties().getProperty("spark.submit.deployMode"); if (deployMode == null) { throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " + "is not specified"); @@ -438,11 +442,11 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } } - private boolean isYarnMode() { - return getSparkMaster().startsWith("yarn"); + private boolean isYarnMode(InterpreterLaunchContext context) { + return getSparkMaster(context).startsWith("yarn"); } - private boolean isYarnCluster() { - return isYarnMode() && "cluster".equalsIgnoreCase(getDeployMode()); + private boolean isYarnCluster(InterpreterLaunchContext context) { + return isYarnMode(context) && "cluster".equalsIgnoreCase(getDeployMode(context)); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index 46caee95f0..77e6e7bddc 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -48,13 +48,12 @@ public class StandardInterpreterLauncher extends InterpreterLauncher { @Override public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching new interpreter process of {}", context.getInterpreterSettingGroup()); - this.properties = context.getProperties(); InterpreterOption option = context.getOption(); InterpreterRunner runner = context.getRunner(); String groupName = context.getInterpreterSettingGroup(); String name = context.getInterpreterSettingName(); - int connectTimeout = getConnectTimeout(); - int connectionPoolSize = getConnectPoolSize(); + int connectTimeout = getConnectTimeout(context); + int connectionPoolSize = getConnectPoolSize(context); if (option.isExistingProcess()) { return new RemoteInterpreterRunningProcess(