This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eaf864  [ZEPPELIN4734]. Sometimes it is unable to restart interpreter
8eaf864 is described below

commit 8eaf864d43a46f10fc75f82464ac0f49d6914b75
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Wed Apr 8 11:47:51 2020 +0800

    [ZEPPELIN4734]. Sometimes it is unable to restart interpreter
    
    ### What is this PR for?
    The root cause is that when restarting interpreter, zeppelin will first 
cancel all jobs, while the cancelling paragraph thread in interpreter process 
may invoke thrift call on zeppelin server side.  The stacktrace in jira 
description has one such example of flink interpreter.
    
    This PR fix this issue by canceling the paragraph in another thread. It is 
fine to cancel paragraph asynchronously
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4734
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3723 from zjffdu/ZEPPELIN-4734 and squashes the following commits:
    
    c0e8c80c8 [Jeff Zhang] [ZEPPELIN-4734]. Sometimes it is unable to restart 
interpreter
---
 .../java/org/apache/zeppelin/flink/PyFlinkInterpreter.java  |  2 +-
 .../interpreter/remote/RemoteInterpreterServer.java         | 13 ++++++++-----
 .../interpreter/remote/RemoteInterpreterServerTest.java     |  2 ++
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 91ec0fe..eaebd64 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -119,7 +119,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
       flinkInterpreter.createPlannerAgain();
       return super.interpret(st, context);
     } finally {
-      if (getPythonProcessLauncher().isRunning()) {
+      if (useIPython() || (!useIPython() && 
getPythonProcessLauncher().isRunning())) {
         InterpreterResult result = 
super.interpret("intp.resetClassLoaderInPythonThread()", context);
         if (result.code() != InterpreterResult.Code.SUCCESS) {
           LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + 
result.toString());
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index d6bd116..9b9e063 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -751,11 +751,14 @@ public class RemoteInterpreterServer extends Thread
     if (job != null && job.getStatus() == Status.PENDING) {
       job.setStatus(Status.ABORT);
     } else {
-      try {
-        intp.cancel(convert(interpreterContext, null));
-      } catch (InterpreterException e) {
-        throw new TException("Fail to cancel", e);
-      }
+      Thread thread = new Thread( ()-> {
+        try {
+          intp.cancel(convert(interpreterContext, null));
+        } catch (InterpreterException e) {
+          logger.error("Fail to cancel paragraph: " + 
interpreterContext.getParagraphId());
+        }
+      });
+      thread.start();
     }
   }
 
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 7beeee8..ebbeea1 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -174,6 +174,8 @@ public class RemoteInterpreterServerTest {
     Thread.sleep(1000);
     assertFalse(interpreter1.cancelled.get());
     server.cancel("session_1", Test1Interpreter.class.getName(), intpContext);
+    // Sleep 1 second, because cancel is async.
+    Thread.sleep(1000);
     assertTrue(interpreter1.cancelled.get());
 
     // getProgress

Reply via email to