This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 59bdb47 [ZEPPELIN-5225] Remote interpreter soft shutdown
59bdb47 is described below
commit 59bdb47817840e1ede35eb7292c3133174eba9ce
Author: Philipp Dallig <[email protected]>
AuthorDate: Fri Feb 5 14:51:51 2021 +0100
[ZEPPELIN-5225] Remote interpreter soft shutdown
### What is this PR for?
This PR moves the exec code to a new class called
`ExecRemoteInterpreterProcess`. This allows other `RemoteInterpreterProcess`
classes to use the better code of `RemoteInterpreterManagedProcess`.
A soft shutdown has been implemented in the new
`ExecRemoteInterpreterProcess` class.
### What type of PR is it?
- Improvement
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5225
### How should this be tested?
* CI
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <[email protected]>
Closes #4035 from Reamer/remote_interpreter_soft_shutdown and squashes the
following commits:
c22df382a [Philipp Dallig] Correct LOGGER messages
8bebe6ac0 [Philipp Dallig] RemoteInterpreterManagedProcess soft shutdown
and abstraction
---
.../remote/RemoteInterpreterServer.java | 2 +-
.../zeppelin/interpreter/util/ProcessLauncher.java | 2 +-
.../launcher/ClusterInterpreterProcess.java | 10 +-
.../launcher/StandardInterpreterLauncher.java | 11 +-
...cess.java => ExecRemoteInterpreterProcess.java} | 219 ++++++++-------------
.../remote/RemoteInterpreterManagedProcess.java | 176 ++---------------
.../launcher/SparkInterpreterLauncherTest.java | 30 +--
.../launcher/StandardInterpreterLauncherTest.java | 10 +-
8 files changed, 140 insertions(+), 320 deletions(-)
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 9645843..e9ebdd1 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -107,7 +107,7 @@ public class RemoteInterpreterServer extends Thread
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoteInterpreterServer.class);
- private static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000;
+ public static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000;
private String interpreterGroupId;
private InterpreterGroup interpreterGroup;
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
index abe6d0a..eb0b65b 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
@@ -197,7 +197,7 @@ public abstract class ProcessLauncher implements
ExecuteResultHandler {
try {
redirectedContext.out.write(s + "\n");
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("unable to write to redirectedContext", e);
}
}
}
diff --git
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index c9ae7f4..6feeec6 100644
---
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -3,10 +3,10 @@ package org.apache.zeppelin.interpreter.launcher;
import java.io.IOException;
import java.util.Map;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess
{
+public class ClusterInterpreterProcess extends ExecRemoteInterpreterProcess {
public ClusterInterpreterProcess(
String intpRunner,
@@ -22,8 +22,7 @@ public class ClusterInterpreterProcess extends
RemoteInterpreterManagedProcess {
String interpreterGroupId,
boolean isUserImpersonated) {
- super(intpRunner,
- intpEventServerPort,
+ super(intpEventServerPort,
intpEventServerHost,
interpreterPortRange,
intpDir,
@@ -33,7 +32,8 @@ public class ClusterInterpreterProcess extends
RemoteInterpreterManagedProcess {
connectionPoolSize,
interpreterSettingName,
interpreterGroupId,
- isUserImpersonated);
+ isUserImpersonated,
+ intpRunner);
}
@Override
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index d52276d..46caee9 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -24,12 +24,13 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.Map;
@@ -68,14 +69,14 @@ public class StandardInterpreterLauncher extends
InterpreterLauncher {
false);
} else {
// create new remote process
- String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ String localRepoPath = zConf.getInterpreterLocalRepoPath() +
File.separator
+ context.getInterpreterSettingId();
- return new RemoteInterpreterManagedProcess(
- runner != null ? runner.getPath() :
zConf.getInterpreterRemoteRunnerPath(),
+ return new ExecRemoteInterpreterProcess(
context.getIntpEventServerPort(), context.getIntpEventServerHost(),
zConf.getInterpreterPortRange(),
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
buildEnvFromProperties(context), connectTimeout, connectionPoolSize,
name,
- context.getInterpreterGroupId(), option.isUserImpersonate());
+ context.getInterpreterGroupId(), option.isUserImpersonate(),
+ runner != null ? runner.getPath() :
zConf.getInterpreterRemoteRunnerPath());
}
}
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
similarity index 53%
copy from
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
copy to
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
index 2436000..1141513 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
@@ -17,7 +17,11 @@
package org.apache.zeppelin.interpreter.remote;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.ExecuteException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -26,36 +30,18 @@ import org.apache.zeppelin.interpreter.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
- private static final Logger LOGGER = LoggerFactory.getLogger(
- RemoteInterpreterManagedProcess.class);
- private static final Pattern YARN_APP_PATTER =
- Pattern.compile("Submitted application (\\w+)");
+public class ExecRemoteInterpreterProcess extends
RemoteInterpreterManagedProcess {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ExecRemoteInterpreterProcess.class);
+
+ private static final Pattern YARN_APP_PATTER = Pattern.compile("Submitted
application (\\w+)");
private final String interpreterRunner;
- private final String interpreterPortRange;
private InterpreterProcessLauncher interpreterProcessLauncher;
- private String host = null;
- private int port = -1;
- private final String interpreterDir;
- private final String localRepoDir;
- private final String interpreterSettingName;
- private final String interpreterGroupId;
- private final boolean isUserImpersonated;
- private String errorMessage;
-
- private Map<String, String> env;
- public RemoteInterpreterManagedProcess(
- String intpRunner,
+ public ExecRemoteInterpreterProcess(
int intpEventServerPort,
String intpEventServerHost,
String interpreterPortRange,
@@ -66,26 +52,11 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
int connectionPoolSize,
String interpreterSettingName,
String interpreterGroupId,
- boolean isUserImpersonated) {
- super(connectTimeout, connectionPoolSize, intpEventServerHost,
intpEventServerPort);
+ boolean isUserImpersonated,
+ String intpRunner) {
+ super(intpEventServerPort, intpEventServerHost, interpreterPortRange,
intpDir, localRepoDir, env, connectTimeout,
+ connectionPoolSize, interpreterSettingName, interpreterGroupId,
isUserImpersonated);
this.interpreterRunner = intpRunner;
- this.interpreterPortRange = interpreterPortRange;
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterSettingName = interpreterSettingName;
- this.interpreterGroupId = interpreterGroupId;
- this.isUserImpersonated = isUserImpersonated;
- }
-
- @Override
- public String getHost() {
- return host;
- }
-
- @Override
- public int getPort() {
- return port;
}
@Override
@@ -93,37 +64,37 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
// start server process
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
- cmdLine.addArgument(interpreterDir, false);
+ cmdLine.addArgument(getInterpreterDir(), false);
cmdLine.addArgument("-c", false);
- cmdLine.addArgument(intpEventServerHost, false);
+ cmdLine.addArgument(getIntpEventServerHost(), false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(String.valueOf(intpEventServerPort), false);
cmdLine.addArgument("-r", false);
- cmdLine.addArgument(interpreterPortRange, false);
+ cmdLine.addArgument(getInterpreterPortRange(), false);
cmdLine.addArgument("-i", false);
- cmdLine.addArgument(interpreterGroupId, false);
- if (isUserImpersonated && !userName.equals("anonymous")) {
+ cmdLine.addArgument(getInterpreterGroupId(), false);
+ if (isUserImpersonated() && !userName.equals("anonymous")) {
cmdLine.addArgument("-u", false);
cmdLine.addArgument(userName, false);
}
cmdLine.addArgument("-l", false);
- cmdLine.addArgument(localRepoDir, false);
+ cmdLine.addArgument(getLocalRepoDir(), false);
cmdLine.addArgument("-g", false);
- cmdLine.addArgument(interpreterSettingName, false);
+ cmdLine.addArgument(getInterpreterSettingName(), false);
- interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, env);
+ interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine,
getEnv());
interpreterProcessLauncher.launch();
interpreterProcessLauncher.waitForReady(getConnectTimeout());
if (interpreterProcessLauncher.isLaunchTimeout()) {
- throw new IOException(String.format("Interpreter Process creation is
time out in %d seconds",
- getConnectTimeout()/1000) + "\n" + "You can increase timeout
threshold via " +
- "setting zeppelin.interpreter.connect.timeout of this
interpreter.\n" +
- interpreterProcessLauncher.getErrorMessage());
+ throw new IOException(
+ String.format("Interpreter Process creation is time out in %d
seconds", getConnectTimeout() / 1000) + "\n"
+ + "You can increase timeout threshold via "
+ + "setting zeppelin.interpreter.connect.timeout of this
interpreter.\n"
+ + interpreterProcessLauncher.getErrorMessage());
}
if (!interpreterProcessLauncher.isRunning()) {
- throw new IOException("Fail to launch interpreter process:\n" +
- interpreterProcessLauncher.getErrorMessage());
+ throw new IOException("Fail to launch interpreter process:\n" +
interpreterProcessLauncher.getErrorMessage());
} else {
String launchOutput =
interpreterProcessLauncher.getProcessLaunchOutput();
Matcher m = YARN_APP_PATTER.matcher(launchOutput);
@@ -136,63 +107,25 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
}
@Override
- public void stop() {
- if (isRunning()) {
- LOGGER.info("Kill interpreter process for interpreter group: {}",
getInterpreterGroupId());
- try {
- callRemoteFunction(client -> {
- client.shutdown();
- return null;
- });
- } catch (Exception e) {
- LOGGER.warn("ignore the exception when shutting down", e);
- }
-
- // Shutdown connection
- shutdown();
- this.interpreterProcessLauncher.stop();
- this.interpreterProcessLauncher = null;
- LOGGER.info("Remote process of interpreter group: {} is terminated",
getInterpreterGroupId());
- }
- }
-
- @Override
public void processStarted(int port, String host) {
- this.port = port;
- this.host = host;
+ super.processStarted(port, host);
// for yarn cluster it may be transitioned from COMPLETED to RUNNING.
interpreterProcessLauncher.onProcessRunning();
}
- // called when remote interpreter process is stopped, e.g. YarnAppsMonitor
will call this
- // after detecting yarn app is killed/failed.
- public void processStopped(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- @VisibleForTesting
- public Map<String, String> getEnv() {
- return env;
- }
-
- @VisibleForTesting
- public String getLocalRepoDir() {
- return localRepoDir;
- }
-
- @VisibleForTesting
- public String getInterpreterDir() {
- return interpreterDir;
- }
-
- @Override
- public String getInterpreterSettingName() {
- return interpreterSettingName;
- }
-
@Override
- public String getInterpreterGroupId() {
- return interpreterGroupId;
+ public void stop() {
+ if (isRunning()) {
+ super.stop();
+ // wait for a clean shutdown
+
this.interpreterProcessLauncher.waitForShutdown(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT
+ 500);
+ // kill process
+ this.interpreterProcessLauncher.stop();
+ this.interpreterProcessLauncher = null;
+ LOGGER.info("Remote exec process of interpreter group: {} is
terminated", getInterpreterGroupId());
+ } else {
+ LOGGER.warn("Try to stop a not running interpreter process of
interpreter group: {}", getInterpreterGroupId());
+ }
}
@VisibleForTesting
@@ -200,39 +133,60 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
return interpreterRunner;
}
- @VisibleForTesting
- public boolean isUserImpersonated() {
- return isUserImpersonated;
- }
-
@Override
public boolean isRunning() {
- return interpreterProcessLauncher != null &&
interpreterProcessLauncher.isRunning()
- && errorMessage == null;
+ return interpreterProcessLauncher != null &&
interpreterProcessLauncher.isRunning();
}
@Override
public String getErrorMessage() {
- String interpreterProcessError = this.interpreterProcessLauncher != null
- ? this.interpreterProcessLauncher.getErrorMessage() : "";
- return errorMessage != null ? errorMessage : interpreterProcessError;
+ return this.interpreterProcessLauncher != null
+ ? this.interpreterProcessLauncher.getErrorMessage()
+ : "";
}
private class InterpreterProcessLauncher extends ProcessLauncher {
- public InterpreterProcessLauncher(CommandLine commandLine,
- Map<String, String> envs) {
+ public InterpreterProcessLauncher(CommandLine commandLine, Map<String,
String> envs) {
super(commandLine, envs);
}
+ public void waitForShutdown(int timeout) {
+ synchronized (this) {
+ long startTime = System.currentTimeMillis();
+ long timeoutTime = startTime + timeout;
+ while (state == State.RUNNING &&
!Thread.currentThread().isInterrupted()) {
+ long timetoTimeout = timeoutTime - System.currentTimeMillis();
+ if (timetoTimeout <= 0) {
+ LOGGER.warn("Shutdown timeout reached");
+ break;
+ }
+ try {
+ wait(timetoTimeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("waitForShutdown interrupted", e);
+ }
+ }
+ }
+ }
+
@Override
public void waitForReady(int timeout) {
synchronized (this) {
- if (state != State.RUNNING) {
+ long startTime = System.currentTimeMillis();
+ long timeoutTime = startTime + timeout;
+ while (state != State.RUNNING &&
!Thread.currentThread().isInterrupted()) {
+ long timetoTimeout = timeoutTime - System.currentTimeMillis();
+ if (timetoTimeout <= 0) {
+ LOGGER.warn("Ready timeout reached");
+ break;
+ }
try {
- wait(timeout);
+ wait(timetoTimeout);
} catch (InterruptedException e) {
- LOGGER.error("Remote interpreter is not accessible", e);
+ Thread.currentThread().interrupt();
+ LOGGER.error("waitForReady interrupted", e);
}
}
}
@@ -245,22 +199,23 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
@Override
public void onProcessRunning() {
super.onProcessRunning();
- synchronized(this) {
- notify();
+ synchronized (this) {
+ notifyAll();
}
}
@Override
public void onProcessComplete(int exitValue) {
- LOGGER.warn("Process is exited with exit value " + exitValue);
- if (env.getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER",
"false").equals("false")) {
+ LOGGER.warn("Process is exited with exit value {}", exitValue);
+ if (getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER",
"false").equals("false")) {
// don't call notify in yarn-cluster mode
synchronized (this) {
- notify();
+ notifyAll();
}
}
// For yarn-cluster mode, client process will exit with exit value 0
- // after submitting spark app. So don't move to TERMINATED state when
exitValue is 0.
+ // after submitting spark app. So don't move to TERMINATED state when
exitValue
+ // is 0.
if (exitValue != 0) {
transition(State.TERMINATED);
} else {
@@ -272,7 +227,7 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
public void onProcessFailed(ExecuteException e) {
super.onProcessFailed(e);
synchronized (this) {
- notify();
+ notifyAll();
}
}
}
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 2436000..c2aca53 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -17,32 +17,21 @@
package org.apache.zeppelin.interpreter.remote;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.ExecuteException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.zeppelin.interpreter.YarnAppMonitor;
-import org.apache.zeppelin.interpreter.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* This class manages start / stop of remote interpreter process
*/
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
+public abstract class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
private static final Logger LOGGER = LoggerFactory.getLogger(
RemoteInterpreterManagedProcess.class);
- private static final Pattern YARN_APP_PATTER =
- Pattern.compile("Submitted application (\\w+)");
- private final String interpreterRunner;
+
private final String interpreterPortRange;
- private InterpreterProcessLauncher interpreterProcessLauncher;
+
private String host = null;
private int port = -1;
private final String interpreterDir;
@@ -55,7 +44,6 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
private Map<String, String> env;
public RemoteInterpreterManagedProcess(
- String intpRunner,
int intpEventServerPort,
String intpEventServerHost,
String interpreterPortRange,
@@ -68,7 +56,6 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
String interpreterGroupId,
boolean isUserImpersonated) {
super(connectTimeout, connectionPoolSize, intpEventServerHost,
intpEventServerPort);
- this.interpreterRunner = intpRunner;
this.interpreterPortRange = interpreterPortRange;
this.env = env;
this.interpreterDir = intpDir;
@@ -89,70 +76,17 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
}
@Override
- public void start(String userName) throws IOException {
- // start server process
- CommandLine cmdLine = CommandLine.parse(interpreterRunner);
- cmdLine.addArgument("-d", false);
- cmdLine.addArgument(interpreterDir, false);
- cmdLine.addArgument("-c", false);
- cmdLine.addArgument(intpEventServerHost, false);
- cmdLine.addArgument("-p", false);
- cmdLine.addArgument(String.valueOf(intpEventServerPort), false);
- cmdLine.addArgument("-r", false);
- cmdLine.addArgument(interpreterPortRange, false);
- cmdLine.addArgument("-i", false);
- cmdLine.addArgument(interpreterGroupId, false);
- if (isUserImpersonated && !userName.equals("anonymous")) {
- cmdLine.addArgument("-u", false);
- cmdLine.addArgument(userName, false);
- }
- cmdLine.addArgument("-l", false);
- cmdLine.addArgument(localRepoDir, false);
- cmdLine.addArgument("-g", false);
- cmdLine.addArgument(interpreterSettingName, false);
-
- interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, env);
- interpreterProcessLauncher.launch();
- interpreterProcessLauncher.waitForReady(getConnectTimeout());
- if (interpreterProcessLauncher.isLaunchTimeout()) {
- throw new IOException(String.format("Interpreter Process creation is
time out in %d seconds",
- getConnectTimeout()/1000) + "\n" + "You can increase timeout
threshold via " +
- "setting zeppelin.interpreter.connect.timeout of this
interpreter.\n" +
- interpreterProcessLauncher.getErrorMessage());
- }
-
- if (!interpreterProcessLauncher.isRunning()) {
- throw new IOException("Fail to launch interpreter process:\n" +
- interpreterProcessLauncher.getErrorMessage());
- } else {
- String launchOutput =
interpreterProcessLauncher.getProcessLaunchOutput();
- Matcher m = YARN_APP_PATTER.matcher(launchOutput);
- if (m.find()) {
- String appId = m.group(1);
- LOGGER.info("Detected yarn app: {}, add it to YarnAppMonitor", appId);
- YarnAppMonitor.get().addYarnApp(ConverterUtils.toApplicationId(appId),
this);
- }
- }
- }
-
- @Override
public void stop() {
- if (isRunning()) {
- LOGGER.info("Kill interpreter process for interpreter group: {}",
getInterpreterGroupId());
- try {
- callRemoteFunction(client -> {
- client.shutdown();
- return null;
- });
- } catch (Exception e) {
- LOGGER.warn("ignore the exception when shutting down", e);
- }
-
+ LOGGER.info("Stop interpreter process for interpreter group: {}",
getInterpreterGroupId());
+ try {
+ callRemoteFunction(client -> {
+ client.shutdown();
+ return null;
+ });
// Shutdown connection
shutdown();
- this.interpreterProcessLauncher.stop();
- this.interpreterProcessLauncher = null;
- LOGGER.info("Remote process of interpreter group: {} is terminated",
getInterpreterGroupId());
+ } catch (Exception e) {
+ LOGGER.warn("ignore the exception when shutting down", e);
}
}
@@ -160,8 +94,6 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
public void processStarted(int port, String host) {
this.port = port;
this.host = host;
- // for yarn cluster it may be transitioned from COMPLETED to RUNNING.
- interpreterProcessLauncher.onProcessRunning();
}
// called when remote interpreter process is stopped, e.g. YarnAppsMonitor
will call this
@@ -170,21 +102,26 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
this.errorMessage = errorMessage;
}
- @VisibleForTesting
public Map<String, String> getEnv() {
return env;
}
- @VisibleForTesting
public String getLocalRepoDir() {
return localRepoDir;
}
- @VisibleForTesting
public String getInterpreterDir() {
return interpreterDir;
}
+ public String getIntpEventServerHost() {
+ return intpEventServerHost;
+ }
+
+ public String getInterpreterPortRange() {
+ return interpreterPortRange;
+ }
+
@Override
public String getInterpreterSettingName() {
return interpreterSettingName;
@@ -195,85 +132,12 @@ public class RemoteInterpreterManagedProcess extends
RemoteInterpreterProcess {
return interpreterGroupId;
}
- @VisibleForTesting
- public String getInterpreterRunner() {
- return interpreterRunner;
- }
-
- @VisibleForTesting
public boolean isUserImpersonated() {
return isUserImpersonated;
}
@Override
- public boolean isRunning() {
- return interpreterProcessLauncher != null &&
interpreterProcessLauncher.isRunning()
- && errorMessage == null;
- }
-
- @Override
public String getErrorMessage() {
- String interpreterProcessError = this.interpreterProcessLauncher != null
- ? this.interpreterProcessLauncher.getErrorMessage() : "";
- return errorMessage != null ? errorMessage : interpreterProcessError;
- }
-
- private class InterpreterProcessLauncher extends ProcessLauncher {
-
- public InterpreterProcessLauncher(CommandLine commandLine,
- Map<String, String> envs) {
- super(commandLine, envs);
- }
-
- @Override
- public void waitForReady(int timeout) {
- synchronized (this) {
- if (state != State.RUNNING) {
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- LOGGER.error("Remote interpreter is not accessible", e);
- }
- }
- }
- this.stopCatchLaunchOutput();
- if (state == State.LAUNCHED) {
- onTimeout();
- }
- }
-
- @Override
- public void onProcessRunning() {
- super.onProcessRunning();
- synchronized(this) {
- notify();
- }
- }
-
- @Override
- public void onProcessComplete(int exitValue) {
- LOGGER.warn("Process is exited with exit value " + exitValue);
- if (env.getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER",
"false").equals("false")) {
- // don't call notify in yarn-cluster mode
- synchronized (this) {
- notify();
- }
- }
- // For yarn-cluster mode, client process will exit with exit value 0
- // after submitting spark app. So don't move to TERMINATED state when
exitValue is 0.
- if (exitValue != 0) {
- transition(State.TERMINATED);
- } else {
- transition(State.COMPLETED);
- }
- }
-
- @Override
- public void onProcessFailed(ExecuteException e) {
- super.onProcessFailed(e);
- synchronized (this) {
- notify();
- }
- }
+ return errorMessage;
}
}
diff --git
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 6aff86a..5c997e1 100644
---
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -21,7 +21,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.integration.DownloadUtils;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
import org.apache.zeppelin.util.Util;
import org.junit.Before;
import org.junit.Test;
@@ -72,8 +72,8 @@ public class SparkInterpreterLauncherTest {
option.setUserImpersonate(true);
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId",
"groupId", "groupName", "name", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue(client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("name", interpreterProcess.getInterpreterSettingName());
assertEquals(zeppelinHome + "/interpreter/groupName",
interpreterProcess.getInterpreterDir());
assertEquals(zeppelinHome + "/local-repo/groupId",
interpreterProcess.getLocalRepoDir());
@@ -98,8 +98,8 @@ public class SparkInterpreterLauncherTest {
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId",
"groupId", "spark", "spark", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue( client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterSettingName());
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -126,8 +126,8 @@ public class SparkInterpreterLauncherTest {
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId",
"groupId", "spark", "spark", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue( client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterSettingName());
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -159,8 +159,8 @@ public class SparkInterpreterLauncherTest {
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId",
"groupId", "spark", "spark", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue( client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterSettingName());
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -192,8 +192,8 @@ public class SparkInterpreterLauncherTest {
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId",
"groupId", "spark", "spark", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue( client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterSettingName());
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -238,8 +238,8 @@ public class SparkInterpreterLauncherTest {
Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(),
"test.jar"));
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue(client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterSettingName());
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -285,8 +285,8 @@ public class SparkInterpreterLauncherTest {
Files.createDirectories(localRepoPath);
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue(client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterSettingName());
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
diff --git
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
index f0f60d9..8e695f3 100644
---
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
+++
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
@@ -19,7 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
import org.junit.Before;
import org.junit.Test;
@@ -48,8 +48,8 @@ public class StandardInterpreterLauncherTest {
option.setUserImpersonate(true);
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId",
"groupId", "groupName", "name", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue(client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("name", interpreterProcess.getInterpreterSettingName());
assertEquals(".//interpreter/groupName",
interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId",
interpreterProcess.getLocalRepoDir());
@@ -73,8 +73,8 @@ public class StandardInterpreterLauncherTest {
option.setUserImpersonate(true);
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId",
"groupId", "groupName", "name", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue(client instanceof ExecRemoteInterpreterProcess);
+ ExecRemoteInterpreterProcess interpreterProcess =
(ExecRemoteInterpreterProcess) client;
assertEquals("name", interpreterProcess.getInterpreterSettingName());
assertEquals(".//interpreter/groupName",
interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId",
interpreterProcess.getLocalRepoDir());