This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 05620fe [ZEPPELIN-5218]. Don't exit shell interpreter if there's more
logging produced
05620fe is described below
commit 05620fe689ac6e3412df605a1a231d5e4ef6012b
Author: Jeff Zhang <[email protected]>
AuthorDate: Sat Feb 6 13:01:48 2021 +0800
[ZEPPELIN-5218]. Don't exit shell interpreter if there's more logging
produced
### What is this PR for?
This PR change the timeout behavior of shell interpreter. Shell command
would only timeout after there're no output produced. If there's continue
output, then the shell command would continue to execute.
### What type of PR is it?
[ Improvement | Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5218
### How should this be tested?
* Test is added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <[email protected]>
Closes #4037 from zjffdu/ZEPPELIN-5218 and squashes the following commits:
45110a47b [Jeff Zhang] fix test
ba9618a46 [Jeff Zhang] soft shutdown executor
3fa41ba77 [Jeff Zhang] shutdown executor in close
ef5a08eee [Jeff Zhang] [ZEPPELIN-5218]. Don't exit shell interpreter if
there's more logging produced
---
.../apache/zeppelin/shell/ShellInterpreter.java | 68 ++++++++++++++++++----
shell/src/main/resources/interpreter-setting.json | 7 +++
.../zeppelin/shell/ShellInterpreterTest.java | 33 ++++++++---
.../zeppelin/interpreter/InterpreterOutput.java | 20 +++++--
4 files changed, 103 insertions(+), 25 deletions(-)
diff --git
a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
index 7cfcd19..c43a8be 100644
--- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
+++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
@@ -24,13 +24,18 @@ import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.util.ExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -47,12 +52,18 @@ public class ShellInterpreter extends KerberosInterpreter {
private static final Logger LOGGER =
LoggerFactory.getLogger(ShellInterpreter.class);
private static final String TIMEOUT_PROPERTY =
"shell.command.timeout.millisecs";
+ private static final String TIMEOUT_CHECK_INTERVAL_PROPERTY =
+ "shell.command.timeout.check.interval";
private static final String DEFAULT_TIMEOUT = "60000";
+ private static final String DEFAULT_CHECK_INTERVAL = "10000";
private static final String DIRECTORY_USER_HOME =
"shell.working.directory.user.home";
private final boolean isWindows =
System.getProperty("os.name").startsWith("Windows");
private final String shell = isWindows ? "cmd /c" : "bash -c";
- ConcurrentHashMap<String, DefaultExecutor> executors;
+ private ConcurrentHashMap<String, DefaultExecutor> executorMap;
+ private ConcurrentHashMap<String, InterpreterContext> contextMap;
+ private ScheduledExecutorService shellOutputCheckExecutor =
+ Executors.newSingleThreadScheduledExecutor();
public ShellInterpreter(Properties property) {
super(property);
@@ -61,15 +72,41 @@ public class ShellInterpreter extends KerberosInterpreter {
@Override
public void open() {
super.open();
- LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
- executors = new ConcurrentHashMap<>();
+ long timeoutThreshold = Long.parseLong(getProperty(TIMEOUT_PROPERTY,
DEFAULT_TIMEOUT));
+ long timeoutCheckInterval = Long.parseLong(
+ getProperty(TIMEOUT_CHECK_INTERVAL_PROPERTY,
DEFAULT_CHECK_INTERVAL));
+ LOGGER.info("Command timeout property: {}", timeoutThreshold);
+ executorMap = new ConcurrentHashMap<>();
+ contextMap = new ConcurrentHashMap<>();
+
+ shellOutputCheckExecutor.scheduleAtFixedRate(() -> {
+ try {
+ for (Map.Entry<String, DefaultExecutor> entry :
executorMap.entrySet()) {
+ String paragraphId = entry.getKey();
+ DefaultExecutor executor = entry.getValue();
+ InterpreterContext context = contextMap.get(paragraphId);
+ if (context == null) {
+ LOGGER.warn("No InterpreterContext associated with paragraph: {}",
paragraphId);
+ continue;
+ }
+ if ((System.currentTimeMillis() -
context.out.getLastWriteTimestamp()) >
+ timeoutThreshold) {
+ LOGGER.info("No output for paragraph {} for the last {}
milli-seconds, so kill it",
+ paragraphId, timeoutThreshold);
+ executor.getWatchdog().destroyProcess();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error when checking shell command timeout", e);
+ }
+ }, timeoutCheckInterval, timeoutCheckInterval, TimeUnit.MILLISECONDS);
}
@Override
public void close() {
super.close();
- for (String executorKey : executors.keySet()) {
- DefaultExecutor executor = executors.remove(executorKey);
+ for (String executorKey : executorMap.keySet()) {
+ DefaultExecutor executor = executorMap.remove(executorKey);
if (executor != null) {
try {
executor.getWatchdog().destroyProcess();
@@ -78,6 +115,11 @@ public class ShellInterpreter extends KerberosInterpreter {
}
}
}
+
+ if (shellOutputCheckExecutor != null) {
+ ExecutorUtil.softShutdown("ShellOutputCheckExecutor",
shellOutputCheckExecutor,
+ 5, TimeUnit.SECONDS);
+ }
}
@Override
@@ -105,13 +147,14 @@ public class ShellInterpreter extends KerberosInterpreter
{
cmdLine.addArgument(cmd, false);
try {
+ contextMap.put(context.getParagraphId(), context);
+
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(new PumpStreamHandler(
context.out, context.out));
+ executor.setWatchdog(new ExecuteWatchdog(Long.MAX_VALUE));
+ executorMap.put(context.getParagraphId(), executor);
- executor.setWatchdog(new ExecuteWatchdog(
- Long.valueOf(getProperty(TIMEOUT_PROPERTY, DEFAULT_TIMEOUT))));
- executors.put(context.getParagraphId(), executor);
if (Boolean.valueOf(getProperty(DIRECTORY_USER_HOME))) {
executor.setWorkingDirectory(new
File(System.getProperty("user.home")));
}
@@ -140,13 +183,14 @@ public class ShellInterpreter extends KerberosInterpreter
{
LOGGER.error("Can not run command: " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
} finally {
- executors.remove(context.getParagraphId());
+ executorMap.remove(context.getParagraphId());
+ contextMap.remove(context.getParagraphId());
}
}
@Override
public void cancel(InterpreterContext context) {
- DefaultExecutor executor = executors.remove(context.getParagraphId());
+ DefaultExecutor executor = executorMap.remove(context.getParagraphId());
if (executor != null) {
try {
executor.getWatchdog().destroyProcess();
@@ -183,6 +227,10 @@ public class ShellInterpreter extends KerberosInterpreter {
return false;
}
+ public ConcurrentHashMap<String, DefaultExecutor> getExecutorMap() {
+ return executorMap;
+ }
+
public void createSecureConfiguration() throws InterpreterException {
Properties properties = getProperties();
CommandLine cmdLine = CommandLine.parse(shell);
diff --git a/shell/src/main/resources/interpreter-setting.json
b/shell/src/main/resources/interpreter-setting.json
index 57b6fa1..7bbed28 100644
--- a/shell/src/main/resources/interpreter-setting.json
+++ b/shell/src/main/resources/interpreter-setting.json
@@ -12,6 +12,13 @@
"description": "Shell command time out in millisecs. Default = 60000",
"type": "number"
},
+ "shell.command.timeout.check.interval": {
+ "envName": "",
+ "propertyName": "shell.command.timeout.check.interval",
+ "defaultValue": "60000",
+ "description": "Shell command output check interval in millisecs.
Default = 10000",
+ "type": "number"
+ },
"shell.working.directory.user.home": {
"envName": "SHELL_WORKING_DIRECTORY_USER_HOME",
"propertyName": "shell.working.directory.user.home",
diff --git
a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
index 78efa1d..ddaf0db 100644
--- a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
+++ b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,10 +41,12 @@ public class ShellInterpreterTest {
@Before
public void setUp() throws Exception {
Properties p = new Properties();
- p.setProperty("shell.command.timeout.millisecs", "2000");
+ p.setProperty("shell.command.timeout.millisecs", "5000");
+ p.setProperty("shell.command.timeout.check.interval", "1000");
shell = new ShellInterpreter(p);
-
- context =
InterpreterContext.builder().setParagraphId("paragraphId").build();
+ context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput())
+ .setParagraphId("paragraphId").build();
shell.open();
}
@@ -59,10 +62,10 @@ public class ShellInterpreterTest {
result = shell.interpret("ls", context);
}
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertTrue(shell.executors.isEmpty());
+ assertTrue(shell.getExecutorMap().isEmpty());
// it should be fine to cancel a statement that has been completed.
shell.cancel(context);
- assertTrue(shell.executors.isEmpty());
+ assertTrue(shell.getExecutorMap().isEmpty());
}
@Test
@@ -73,18 +76,30 @@ public class ShellInterpreterTest {
result = shell.interpret("invalid_command\nls", context);
}
assertEquals(Code.SUCCESS, result.code());
- assertTrue(shell.executors.isEmpty());
+ assertTrue(shell.getExecutorMap().isEmpty());
}
@Test
public void testShellTimeout() throws InterpreterException {
if (System.getProperty("os.name").startsWith("Windows")) {
- result = shell.interpret("timeout 4", context);
+ result = shell.interpret("timeout 8", context);
} else {
- result = shell.interpret("sleep 4", context);
+ result = shell.interpret("sleep 8", context);
}
-
+ // exit shell process because no output is produced during the timeout
threshold
assertEquals(Code.INCOMPLETE, result.code());
assertTrue(result.message().get(0).getData().contains("Paragraph received
a SIGTERM"));
}
+
+ @Test
+ public void testShellTimeout2() throws InterpreterException {
+ context = InterpreterContext.builder()
+ .setParagraphId("paragraphId")
+ .setInterpreterOut(new InterpreterOutput())
+ .build();
+ result = shell.interpret("for i in {1..10}\ndo\n\tsleep 1\n\techo
$i\ndone", context);
+ // won't exit shell because the continues output is produced
+ assertEquals(Code.SUCCESS, result.code());
+ assertEquals("1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n", context.out.toString());
+ }
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index 4462635..3c20c12 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -36,7 +36,13 @@ import java.util.List;
* in addition to InterpreterResult which used to return from
Interpreter.interpret().
*/
public class InterpreterOutput extends OutputStream {
- Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InterpreterOutput.class);
+
+ // change static var to set interpreter output limit
+ // limit will be applied to all InterpreterOutput object.
+ // so we can expect the consistent behavior
+ public static int LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+
private static final int NEW_LINE_CHAR = '\n';
private static final int LINE_FEED_CHAR = '\r';
@@ -56,10 +62,7 @@ public class InterpreterOutput extends OutputStream {
// so just refresh all output for streaming application, such as flink
streaming sql output.
private boolean enableTableAppend = false;
- // change static var to set interpreter output limit
- // limit will be applied to all InterpreterOutput object.
- // so we can expect the consistent behavior
- public static int LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+ private long lastWriteTimestamp = System.currentTimeMillis();
public InterpreterOutput() {
changeListener = null;
@@ -177,7 +180,7 @@ public class InterpreterOutput extends OutputStream {
try {
out.close();
} catch (IOException e) {
- logger.error(e.getMessage(), e);
+ LOGGER.error(e.getMessage(), e);
}
}
@@ -212,6 +215,7 @@ public class InterpreterOutput extends OutputStream {
return;
}
+ this.lastWriteTimestamp = System.currentTimeMillis();
synchronized (resultMessageOutputs) {
currentOut = getCurrentOutput();
@@ -407,4 +411,8 @@ public class InterpreterOutput extends OutputStream {
}
}
}
+
+ public long getLastWriteTimestamp() {
+ return lastWriteTimestamp;
+ }
}