Repository: zeppelin
Updated Branches:
  refs/heads/master a791fad59 -> de03a21ba


ZEPPELIN-3236. Make grpc framesize configurable

### What is this PR for?

Add one new property `zeppelin.ipython.grpc.framesize` which is an advanced 
configuration. By default it is 32M which should be sufficient for most of 
scenarios.

### What type of PR is it?
[ Improvement ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3236

### How should this be tested?
* Unit test is added.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2802 from zjffdu/ZEPPELIN-3236 and squashes the following commits:

ffce774 [Jeff Zhang] ZEPPELIN-3236. Make grpc framesize configurable


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/de03a21b
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/de03a21b
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/de03a21b

Branch: refs/heads/master
Commit: de03a21ba62084a37fd4ef9bba8c00e33b6644cb
Parents: a791fad
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Feb 15 10:30:10 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Wed Feb 21 13:27:27 2018 +0800

----------------------------------------------------------------------
 .../apache/zeppelin/python/IPythonClient.java   |  8 ++++
 .../zeppelin/python/IPythonInterpreter.java     |  6 ++-
 .../src/main/resources/interpreter-setting.json |  6 +++
 .../zeppelin/python/IPythonInterpreterTest.java | 40 ++++++++++++++++++--
 4 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java 
b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
index ac10204..b3bc7fd 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.python;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.python.proto.CancelRequest;
 import org.apache.zeppelin.python.proto.CancelResponse;
@@ -131,11 +132,18 @@ public class IPythonClient {
       @Override
       public void onError(Throwable throwable) {
         try {
+          
interpreterOutput.getInterpreterOutput().write(ExceptionUtils.getStackTrace(throwable));
           interpreterOutput.getInterpreterOutput().flush();
         } catch (IOException e) {
           LOGGER.error("Unexpected IOException", e);
         }
         LOGGER.error("Fail to call IPython grpc", throwable);
+        finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
+
+        completedFlag.set(true);
+        synchronized (completedFlag) {
+          completedFlag.notify();
+        }
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
----------------------------------------------------------------------
diff --git 
a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java 
b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 8078670..10bf530 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.python;
 
+import io.grpc.ManagedChannelBuilder;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.ExecuteException;
@@ -142,7 +143,10 @@ public class IPythonInterpreter extends Interpreter 
implements ExecuteResultHand
       int jvmGatewayPort = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
       LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
       LOGGER.info("Launching JVM Gateway at port: " + jvmGatewayPort);
-      ipythonClient = new IPythonClient("127.0.0.1", ipythonPort);
+      int framesize = 
Integer.parseInt(getProperty("zeppelin.ipython.grpc.framesize",
+          32 * 1024 * 1024 + ""));
+      ipythonClient = new 
IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort)
+          .usePlaintext(true).maxInboundMessageSize(framesize));
       launchIPythonKernel(ipythonPort);
       setupJVMGateway(jvmGatewayPort);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/python/src/main/resources/interpreter-setting.json 
b/python/src/main/resources/interpreter-setting.json
index d6b3538..3257e58 100644
--- a/python/src/main/resources/interpreter-setting.json
+++ b/python/src/main/resources/interpreter-setting.json
@@ -40,6 +40,12 @@
         "defaultValue": "30000",
         "description": "time out for ipython launch",
         "type": "number"
+      },
+      "zeppelin.ipython.grpc.framesize": {
+        "propertyName": "zeppelin.ipython.grpc.framesize",
+        "defaultValue": "33554432",
+        "description": "grpc framesize, default is 32M",
+        "type": "number"
       }
     },
     "editor": {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index ec59482..dfc8c36 100644
--- 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -56,9 +56,7 @@ public class IPythonInterpreterTest {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IPythonInterpreterTest.class);
   private IPythonInterpreter interpreter;
 
-  @Before
-  public void setUp() throws InterpreterException {
-    Properties properties = new Properties();
+  public void startInterpreter(Properties properties) throws 
InterpreterException {
     interpreter = new IPythonInterpreter(properties);
     InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class);
     interpreter.setInterpreterGroup(mockInterpreterGroup);
@@ -73,9 +71,45 @@ public class IPythonInterpreterTest {
 
   @Test
   public void testIPython() throws IOException, InterruptedException, 
InterpreterException {
+    startInterpreter(new Properties());
     testInterpreter(interpreter);
   }
 
+  @Test
+  public void testGrpcFrameSize() throws InterpreterException, IOException {
+    Properties properties = new Properties();
+    properties.setProperty("zeppelin.ipython.grpc.framesize", "4");
+    startInterpreter(properties);
+
+    // to make this test can run under both python2 and python3
+    InterpreterResult result = interpreter.interpret("from __future__ import 
print_function", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    InterpreterContext context = getInterpreterContext();
+    result = interpreter.interpret("print(11111111111111111111111111111)", 
context);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    List<InterpreterResultMessage> interpreterResultMessages = 
context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 
32 exceeds maximum: 4"));
+
+    // next call continue work
+    result = interpreter.interpret("print(1)", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    close();
+
+    // increase framesize to make it work
+    properties.setProperty("zeppelin.ipython.grpc.framesize", "40");
+    startInterpreter(properties);
+    // to make this test can run under both python2 and python3
+    result = interpreter.interpret("from __future__ import print_function", 
getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = getInterpreterContext();
+    result = interpreter.interpret("print(11111111111111111111111111111)", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
   public static void testInterpreter(final Interpreter interpreter) throws 
IOException, InterruptedException, InterpreterException {
     // to make this test can run under both python2 and python3
     InterpreterResult result = interpreter.interpret("from __future__ import 
print_function", getInterpreterContext());

Reply via email to