This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ccf608  [ZEPPELIN-4091] Fix concurrent autocomplete and execute for 
Ipython
7ccf608 is described below

commit 7ccf608ee72470a38cf07ea16cba2cd292b6f39c
Author: marc hurabielle <marc.hurabie...@gmail.com>
AuthorDate: Sat Sep 7 15:22:32 2019 +0900

    [ZEPPELIN-4091] Fix concurrent autocomplete and execute for Ipython
    
    ### What is this PR for?
    
    The pr is to fix a bug that will make the **ipython** `execute_interactive` 
hang forever if a auto `complete` call is make at the same time. (see unit test 
for example that is failing on master).
    
    For now the fix is to synchronize those method : `execute` / `complete`. It 
will not bring regression because anyway, the kernel does not support 
concurrent execute and auto complete (see 
https://github.com/jupyter/notebook/issues/3763)
    
    ### What type of PR is it?
    Bug Fix
    
    ### Todos
    * [x] - unit test failing in master / succeed on this branch
    * [x] - fix with lock
    
    ### What is the Jira issue?
    It is one part of the jira issue. Other fix will come soon
    https://issues.apache.org/jira/browse/ZEPPELIN-4091
    
    ### How should this be tested?
    * First time? Setup Travis CI as described on 
https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed 
behavior
    * Outline any manual steps to test the PR here.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: marc hurabielle <marc.hurabie...@gmail.com>
    
    Closes #3336 from AyWa/fix/concurrent-auto-complete and squashes the 
following commits:
    
    86dab7345 [marc hurabielle] fix rebase
    5bed19496 [marc hurabielle] fix lint
    6e48c1380 [marc hurabielle] try single threaded
    f14d8b242 [marc hurabielle] Revert "just test ci behavior"
    be6663f89 [marc hurabielle] just test ci behavior
    bc2b4f6e6 [marc hurabielle] bring back test
    d43f03da9 [marc hurabielle] use initIntpProperties instead of empty one
    c37414cc2 [marc hurabielle] increase timeout
    f7cae9538 [marc hurabielle] move synchronize near the thread check
    616f0122f [marc hurabielle] add test to ensure that autocomplete and 
interpret can be call concurrently
    409b75f0f [marc hurabielle] add lock to ensure ipython execute will not be 
stuck forever when complete is call
---
 .../main/resources/grpc/python/ipython_server.py   | 74 ++++++++++++----------
 .../zeppelin/python/IPythonInterpreterTest.java    | 50 +++++++++++++++
 2 files changed, 89 insertions(+), 35 deletions(-)

diff --git a/python/src/main/resources/grpc/python/ipython_server.py 
b/python/src/main/resources/grpc/python/ipython_server.py
index 3fd0a8c..47f67b7 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -38,6 +38,10 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
     def __init__(self, server):
         self._status = ipython_pb2.STARTING
         self._server = server
+        # issue with execute_interactive and auto completion: 
https://github.com/jupyter/jupyter_client/issues/429
+        # in all case because ipython does not support run and auto completion 
at the same time: https://github.com/jupyter/notebook/issues/3763
+        # For now we will lock to ensure that there is no concurrent bug that 
can "hang" the kernel
+        self._lock = threading.Lock()
 
     def start(self):
         print("starting...")
@@ -83,43 +87,42 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         payload_reply = []
         def execute_worker():
             reply = self._kc.execute_interactive(request.code,
-                                            output_hook=_output_hook,
-                                            timeout=None)
+                                          output_hook=_output_hook,
+                                          timeout=None)
             payload_reply.append(reply)
 
         t = threading.Thread(name="ConsumerThread", target=execute_worker)
-        t.start()
-
-        # We want to ensure that the kernel is alive because in case of OOM or 
other errors
-        # Execution might be stuck there:
-        # 
https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
-        while t.is_alive() and self.isKernelAlive():
-            while not text_queue.empty():
-                output = text_queue.get()
-                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                  type=ipython_pb2.TEXT,
-                                                  output=output)
-            while not html_queue.empty():
-                output = html_queue.get()
-                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                  type=ipython_pb2.HTML,
-                                                  output=output)
-            while not stderr_queue.empty():
-                output = stderr_queue.get()
-                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
-                                                  type=ipython_pb2.TEXT,
-                                                  output=output)
-            while not png_queue.empty():
-                output = png_queue.get()
-                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                  type=ipython_pb2.PNG,
-                                                  output=output)
-            while not jpeg_queue.empty():
-                output = jpeg_queue.get()
-                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                  type=ipython_pb2.JPEG,
-                                                  output=output)
-
+        with self._lock:
+            t.start()
+            # We want to ensure that the kernel is alive because in case of 
OOM or other errors
+            # Execution might be stuck there:
+            # 
https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
+            while t.is_alive() and self.isKernelAlive():
+                while not text_queue.empty():
+                    output = text_queue.get()
+                    yield 
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                                      type=ipython_pb2.TEXT,
+                                                      output=output)
+                while not html_queue.empty():
+                    output = html_queue.get()
+                    yield 
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                                      type=ipython_pb2.HTML,
+                                                      output=output)
+                while not stderr_queue.empty():
+                    output = stderr_queue.get()
+                    yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+                                                      type=ipython_pb2.TEXT,
+                                                      output=output)
+                while not png_queue.empty():
+                    output = png_queue.get()
+                    yield 
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                                      type=ipython_pb2.PNG,
+                                                      output=output)
+                while not jpeg_queue.empty():
+                    output = jpeg_queue.get()
+                    yield 
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                                      type=ipython_pb2.JPEG,
+                                                      output=output)
 
         # if kernel is not alive (should be same as thread is still alive), 
means that we face
         # an unexpected issue.
@@ -169,7 +172,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         return ipython_pb2.CancelResponse()
 
     def complete(self, request, context):
-        reply = self._kc.complete(request.code, request.cursor, reply=True, 
timeout=None)
+        with self._lock:
+            reply = self._kc.complete(request.code, request.cursor, 
reply=True, timeout=None)
         return 
ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
 
     def status(self, request, context):
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index e084bfe..87e5071 100644
--- 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -27,12 +27,19 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static junit.framework.TestCase.assertTrue;
@@ -280,6 +287,49 @@ public class IPythonInterpreterTest extends 
BasePythonInterpreterTest {
   }
 
   @Test
+  public void 
testIpython_shouldNotHang_whenCallingAutoCompleteAndInterpretConcurrently()
+      throws InterpreterException,
+      InterruptedException, TimeoutException, ExecutionException {
+    tearDown();
+    Properties properties = initIntpProperties();
+    startInterpreter(properties);
+    final String code = "import time\n"
+        + "print(1)\n"
+        + "time.sleep(10)\n"
+        + "print(2)";
+    final String base = "time.";
+
+    // The goal of this test is to ensure that concurrent interpret and 
complete
+    // will not make execute hang forever.
+    ExecutorService pool = Executors.newFixedThreadPool(2);
+    FutureTask<InterpreterResult> interpretFuture =
+        new FutureTask(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            return interpreter.interpret(code, getInterpreterContext());
+          }
+        });
+    FutureTask<List<InterpreterCompletion>> completionFuture =
+        new FutureTask(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            return interpreter.completion(base, base.length(), 
getInterpreterContext());
+          }
+        });
+
+    pool.execute(interpretFuture);
+    // we sleep to ensure that the paragraph is running
+    Thread.sleep(3000);
+    pool.execute(completionFuture);
+
+    // We ensure that running and auto completion are not hanging.
+    InterpreterResult res = interpretFuture.get(20000, TimeUnit.MILLISECONDS);
+    List<InterpreterCompletion> autoRes = completionFuture.get(1000, 
TimeUnit.MILLISECONDS);
+    assertTrue(res.code().name().equals("SUCCESS"));
+    assertTrue(autoRes.size() > 0);
+  }
+
+  @Test
   public void testGrpcFrameSize() throws InterpreterException, IOException {
     tearDown();
 

Reply via email to