This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 7ccf608 [ZEPPELIN-4091] Fix concurrent autocomplete and execute for Ipython 7ccf608 is described below commit 7ccf608ee72470a38cf07ea16cba2cd292b6f39c Author: marc hurabielle <marc.hurabie...@gmail.com> AuthorDate: Sat Sep 7 15:22:32 2019 +0900 [ZEPPELIN-4091] Fix concurrent autocomplete and execute for Ipython ### What is this PR for? The pr is to fix a bug that will make the **ipython** `execute_interactive` hang forever if a auto `complete` call is make at the same time. (see unit test for example that is failing on master). For now the fix is to synchronize those method : `execute` / `complete`. It will not bring regression because anyway, the kernel does not support concurrent execute and auto complete (see https://github.com/jupyter/notebook/issues/3763) ### What type of PR is it? Bug Fix ### Todos * [x] - unit test failing in master / succeed on this branch * [x] - fix with lock ### What is the Jira issue? It is one part of the jira issue. Other fix will come soon https://issues.apache.org/jira/browse/ZEPPELIN-4091 ### How should this be tested? * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration * Strongly recommended: add automated unit tests for any new or changed behavior * Outline any manual steps to test the PR here. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: marc hurabielle <marc.hurabie...@gmail.com> Closes #3336 from AyWa/fix/concurrent-auto-complete and squashes the following commits: 86dab7345 [marc hurabielle] fix rebase 5bed19496 [marc hurabielle] fix lint 6e48c1380 [marc hurabielle] try single threaded f14d8b242 [marc hurabielle] Revert "just test ci behavior" be6663f89 [marc hurabielle] just test ci behavior bc2b4f6e6 [marc hurabielle] bring back test d43f03da9 [marc hurabielle] use initIntpProperties instead of empty one c37414cc2 [marc hurabielle] increase timeout f7cae9538 [marc hurabielle] move synchronize near the thread check 616f0122f [marc hurabielle] add test to ensure that autocomplete and interpret can be call concurrently 409b75f0f [marc hurabielle] add lock to ensure ipython execute will not be stuck forever when complete is call --- .../main/resources/grpc/python/ipython_server.py | 74 ++++++++++++---------- .../zeppelin/python/IPythonInterpreterTest.java | 50 +++++++++++++++ 2 files changed, 89 insertions(+), 35 deletions(-) diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py index 3fd0a8c..47f67b7 100644 --- a/python/src/main/resources/grpc/python/ipython_server.py +++ b/python/src/main/resources/grpc/python/ipython_server.py @@ -38,6 +38,10 @@ class IPython(ipython_pb2_grpc.IPythonServicer): def __init__(self, server): self._status = ipython_pb2.STARTING self._server = server + # issue with execute_interactive and auto completion: https://github.com/jupyter/jupyter_client/issues/429 + # in all case because ipython does not support run and auto completion at the same time: https://github.com/jupyter/notebook/issues/3763 + # For now we will lock to ensure that there is no concurrent bug that can "hang" the kernel + self._lock = threading.Lock() def start(self): print("starting...") @@ -83,43 +87,42 @@ class IPython(ipython_pb2_grpc.IPythonServicer): payload_reply = [] def execute_worker(): reply = self._kc.execute_interactive(request.code, - output_hook=_output_hook, - timeout=None) + output_hook=_output_hook, + timeout=None) payload_reply.append(reply) t = threading.Thread(name="ConsumerThread", target=execute_worker) - t.start() - - # We want to ensure that the kernel is alive because in case of OOM or other errors - # Execution might be stuck there: - # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32 - while t.is_alive() and self.isKernelAlive(): - while not text_queue.empty(): - output = text_queue.get() - yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, - type=ipython_pb2.TEXT, - output=output) - while not html_queue.empty(): - output = html_queue.get() - yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, - type=ipython_pb2.HTML, - output=output) - while not stderr_queue.empty(): - output = stderr_queue.get() - yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR, - type=ipython_pb2.TEXT, - output=output) - while not png_queue.empty(): - output = png_queue.get() - yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, - type=ipython_pb2.PNG, - output=output) - while not jpeg_queue.empty(): - output = jpeg_queue.get() - yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, - type=ipython_pb2.JPEG, - output=output) - + with self._lock: + t.start() + # We want to ensure that the kernel is alive because in case of OOM or other errors + # Execution might be stuck there: + # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32 + while t.is_alive() and self.isKernelAlive(): + while not text_queue.empty(): + output = text_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.TEXT, + output=output) + while not html_queue.empty(): + output = html_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.HTML, + output=output) + while not stderr_queue.empty(): + output = stderr_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR, + type=ipython_pb2.TEXT, + output=output) + while not png_queue.empty(): + output = png_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.PNG, + output=output) + while not jpeg_queue.empty(): + output = jpeg_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.JPEG, + output=output) # if kernel is not alive (should be same as thread is still alive), means that we face # an unexpected issue. @@ -169,7 +172,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer): return ipython_pb2.CancelResponse() def complete(self, request, context): - reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None) + with self._lock: + reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None) return ipython_pb2.CompletionResponse(matches=reply['content']['matches']) def status(self, request, context): diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index e084bfe..87e5071 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -27,12 +27,19 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static junit.framework.TestCase.assertTrue; @@ -280,6 +287,49 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest { } @Test + public void testIpython_shouldNotHang_whenCallingAutoCompleteAndInterpretConcurrently() + throws InterpreterException, + InterruptedException, TimeoutException, ExecutionException { + tearDown(); + Properties properties = initIntpProperties(); + startInterpreter(properties); + final String code = "import time\n" + + "print(1)\n" + + "time.sleep(10)\n" + + "print(2)"; + final String base = "time."; + + // The goal of this test is to ensure that concurrent interpret and complete + // will not make execute hang forever. + ExecutorService pool = Executors.newFixedThreadPool(2); + FutureTask<InterpreterResult> interpretFuture = + new FutureTask(new Callable() { + @Override + public Object call() throws Exception { + return interpreter.interpret(code, getInterpreterContext()); + } + }); + FutureTask<List<InterpreterCompletion>> completionFuture = + new FutureTask(new Callable() { + @Override + public Object call() throws Exception { + return interpreter.completion(base, base.length(), getInterpreterContext()); + } + }); + + pool.execute(interpretFuture); + // we sleep to ensure that the paragraph is running + Thread.sleep(3000); + pool.execute(completionFuture); + + // We ensure that running and auto completion are not hanging. + InterpreterResult res = interpretFuture.get(20000, TimeUnit.MILLISECONDS); + List<InterpreterCompletion> autoRes = completionFuture.get(1000, TimeUnit.MILLISECONDS); + assertTrue(res.code().name().equals("SUCCESS")); + assertTrue(autoRes.size() > 0); + } + + @Test public void testGrpcFrameSize() throws InterpreterException, IOException { tearDown();