This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new e1ba413 [ZEPPELIN-4525]. Support Shiny in R Interpreter e1ba413 is described below commit e1ba4131ceb297d7399fdc5a6980cbeb619307e6 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Jan 2 09:55:52 2020 +0800 [ZEPPELIN-4525]. Support Shiny in R Interpreter ### What is this PR for? This PR is to support shiny app in R interpreter. It is based on jupyter's irkernel, so that means you need to install irkernel first, and install shiny package as well. Writing shiny app in Zeppelin requires at 3 paragraphs. * UI paragraph e.g. %r.shiny(type=ui) * Server paragraph e.g. %r.shiny(type=server) * Run paragraph e.g. %r.shiny(type=run) * Normal R code paragraph(optional) e.g. %r.shiny See the screenshot below for more details. ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4525 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ![shiny_app](https://user-images.githubusercontent.com/164491/71650190-706bca00-2d4f-11ea-8706-9ca674799bae.gif) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3582 from zjffdu/ZEPPELIN-4525 and squashes the following commits: 93edffc5f [Jeff Zhang] [ZEPPELIN-4525]. Support Shiny in R Interpreter --- rlang/pom.xml | 28 +++ .../java/org/apache/zeppelin/r/IRInterpreter.java | 80 +++++++ .../org/apache/zeppelin/r/ShinyInterpreter.java | 145 ++++++++++++ rlang/src/main/resources/interpreter-setting.json | 36 +++ .../apache/zeppelin/r/ShinyInterpreterTest.java | 246 +++++++++++++++++++++ rlang/src/test/resources/invalid_ui.R | 1 + rlang/src/test/resources/log4j.properties | 1 + rlang/src/test/resources/server.R | 23 ++ rlang/src/test/resources/ui.R | 35 +++ testing/install_external_dependencies.sh | 1 + .../zeppelin/interpreter/util/ProcessLauncher.java | 23 ++ .../zeppelin/jupyter/JupyterKernelClient.java | 58 ++++- .../zeppelin/jupyter/JupyterKernelInterpreter.java | 9 +- .../main/resources/grpc/jupyter/kernel_server.py | 7 +- .../apache/zeppelin/jupyter/IPythonKernelTest.java | 8 + .../org/apache/zeppelin/jupyter/IRKernelTest.java | 4 +- .../org/apache/zeppelin/notebook/Paragraph.java | 4 +- 17 files changed, 695 insertions(+), 14 deletions(-) diff --git a/rlang/pom.xml b/rlang/pom.xml index 5a7de91..a8021b4 100644 --- a/rlang/pom.xml +++ b/rlang/pom.xml @@ -59,6 +59,27 @@ </dependency> <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + <version>${grpc.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-jupyter-interpreter</artifactId> <scope>test</scope> @@ -95,6 +116,13 @@ <version>${jsoup.version}</version> </dependency> + <dependency> + <groupId>com.mashape.unirest</groupId> + <artifactId>unirest-java</artifactId> + <version>1.4.9</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java index f04b091..d9076b4 100644 --- a/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java +++ b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java @@ -20,16 +20,23 @@ package org.apache.zeppelin.r; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest; import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse; import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.apache.zeppelin.jupyter.JupyterKernelInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; +import java.io.StringReader; +import java.nio.file.Files; import java.util.Properties; /** @@ -41,6 +48,9 @@ public class IRInterpreter extends JupyterKernelInterpreter { private static final Logger LOGGER = LoggerFactory.getLogger(IRInterpreter.class); + // It is used to store shiny related code (ui.R & server.R) + // only one shiny app can be hosted in one R session. + private File shinyAppFolder; private SparkRBackend sparkRBackend; public IRInterpreter(Properties properties) { @@ -98,6 +108,13 @@ public class IRInterpreter extends JupyterKernelInterpreter { throw new InterpreterException("Fail to init IR Kernel:\n" + ExceptionUtils.getStackTrace(e), e); } + + try { + this.shinyAppFolder = Files.createTempDirectory("zeppelin-shiny").toFile(); + this.shinyAppFolder.deleteOnExit(); + } catch (IOException e) { + throw new InterpreterException(e); + } } /** @@ -134,4 +151,67 @@ public class IRInterpreter extends JupyterKernelInterpreter { return new RZeppelinContext(getInterpreterGroup().getInterpreterHookRegistry(), Integer.parseInt(getProperty("zeppelin.r.maxResult", "1000"))); } + + public InterpreterResult shinyUI(String st, + InterpreterContext context) throws InterpreterException { + File uiFile = new File(shinyAppFolder, "ui.R"); + FileWriter writer = null; + try { + writer = new FileWriter(uiFile); + IOUtils.copy(new StringReader(st), writer); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Write ui.R to " + + shinyAppFolder.getAbsolutePath() + " successfully."); + } catch (IOException e) { + throw new InterpreterException("Fail to write shiny file ui.R", e); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + throw new InterpreterException(e); + } + } + } + } + + public InterpreterResult shinyServer(String st, + InterpreterContext context) throws InterpreterException { + File serverFile = new File(shinyAppFolder, "server.R"); + FileWriter writer = null; + try { + writer = new FileWriter(serverFile); + IOUtils.copy(new StringReader(st), writer); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Write server.R to " + + shinyAppFolder.getAbsolutePath() + " successfully."); + } catch (IOException e) { + throw new InterpreterException("Fail to write shiny file server.R", e); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + throw new InterpreterException(e); + } + } + } + } + + public InterpreterResult runShinyApp(InterpreterContext context) + throws IOException, InterpreterException { + // redirect R kernel process to InterpreterOutput of current paragraph + // because the error message after shiny app launched is printed in R kernel process + getKernelProcessLauncher().setRedirectedContext(context); + try { + StringBuilder builder = new StringBuilder("library(shiny)\n"); + String host = RemoteInterpreterUtils.findAvailableHostAddress(); + int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + builder.append("runApp(appDir='" + shinyAppFolder.getAbsolutePath() + "', " + + "port=" + port + ", host='" + host + "', launch.browser=FALSE)"); + // shiny app will launch and block there until user cancel the paragraph. + LOGGER.info("Run shiny app code: " + builder.toString()); + return internalInterpret(builder.toString(), context); + } finally { + getKernelProcessLauncher().setRedirectedContext(null); + } + } } diff --git a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java new file mode 100644 index 0000000..b2dc5f3 --- /dev/null +++ b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.AbstractInterpreter; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * One shiny Interpreter can host more than one Shiny app. + * They are organized by app name which you specify by paragraph local properties. + * e.g. %shiny(app=app_1) + * + * If you don't specify 'app', then default app name 'default' will be used. + * + * One shiny app is composed of at least 3 paragraph (last one is optional) + * <p> + * <ul> + * <li>UI paragraph e.g. %r.shiny(type=ui) </li> + * <li>Server paragraph e.g. %r.shiny(type=server) </li> + * <li>Run paragraph e.g. %r.shiny(type=run) </li> + * <li>Normal R code paragraph(optional) e.g. %r.shiny </li> + * </ul> + * <p> + */ +public class ShinyInterpreter extends AbstractInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(ShinyInterpreter.class); + + private static final String DEFAULT_APP_NAME = "default"; + private Map<String, IRInterpreter> shinyIRInterpreters = new HashMap<>(); + private RZeppelinContext z; + + public ShinyInterpreter(Properties properties) { + super(properties); + } + + @Override + public void open() throws InterpreterException { + this.z = new RZeppelinContext(getInterpreterGroup().getInterpreterHookRegistry(), 1000); + } + + @Override + public void close() throws InterpreterException { + for (Map.Entry<String,IRInterpreter> entry : shinyIRInterpreters.entrySet()) { + LOGGER.info("Closing IRInterpreter: " + entry.getKey()); + // Stop shiny app first otherwise the R process can not be terminated. + entry.getValue().cancel(InterpreterContext.get()); + entry.getValue().close(); + } + } + + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + String shinyApp = context.getStringLocalProperty("app", DEFAULT_APP_NAME); + IRInterpreter irInterpreter = getIRInterpreter(shinyApp); + irInterpreter.cancel(context); + } + + @Override + public FormType getFormType() throws InterpreterException { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } + + @Override + public BaseZeppelinContext getZeppelinContext() { + return this.z; + } + + @Override + public InterpreterResult internalInterpret(String st, InterpreterContext context) + throws InterpreterException { + String shinyApp = context.getStringLocalProperty("app", DEFAULT_APP_NAME); + String shinyType = context.getStringLocalProperty("type", ""); + IRInterpreter irInterpreter = getIRInterpreter(shinyApp); + if (StringUtils.isBlank(shinyType)) { + return irInterpreter.internalInterpret(st, context); + } else if (shinyType.equals("run")) { + try { + return irInterpreter.runShinyApp(context); + } catch (IOException e) { + throw new InterpreterException(e); + } + } else if (shinyType.equals("ui")) { + return irInterpreter.shinyUI(st, context); + } else if (shinyType.equals("server")) { + return irInterpreter.shinyServer(st, context); + } else { + throw new InterpreterException("Unknown shiny type: " + shinyType); + } + } + + /** + * Get the specific IRInterpreter for this shinyApp. + * One ShinyApp is owned by one IRInterpreter(R session). + * + * @param shinyApp + * @return + * @throws InterpreterException + */ + private IRInterpreter getIRInterpreter(String shinyApp) throws InterpreterException { + IRInterpreter irInterpreter = null; + synchronized (shinyIRInterpreters) { + irInterpreter = shinyIRInterpreters.get(shinyApp); + if (irInterpreter == null) { + irInterpreter = new IRInterpreter(properties); + irInterpreter.setInterpreterGroup(getInterpreterGroup()); + irInterpreter.open(); + shinyIRInterpreters.put(shinyApp, irInterpreter); + } + } + return irInterpreter; + } + +} diff --git a/rlang/src/main/resources/interpreter-setting.json b/rlang/src/main/resources/interpreter-setting.json index 47697bd..1026067 100644 --- a/rlang/src/main/resources/interpreter-setting.json +++ b/rlang/src/main/resources/interpreter-setting.json @@ -32,6 +32,11 @@ "description": "", "type": "textarea" } + }, + "editor": { + "language": "r", + "editOnDblClick": false, + "completionSupport": true } }, { @@ -39,6 +44,37 @@ "name": "ir", "className": "org.apache.zeppelin.r.IRInterpreter", "properties": { + }, + "editor": { + "language": "r", + "editOnDblClick": false, + "completionSupport": true + } + }, + { + "group": "r", + "name": "shiny", + "className": "org.apache.zeppelin.r.ShinyInterpreter", + "properties": { + "zeppelin.R.shiny.iframe_width": { + "envName": "", + "propertyName": "zeppelin.R.shiny.iframe_width", + "defaultValue": "100%", + "description": "", + "type": "text" + }, + "zeppelin.R.shiny.iframe_height": { + "envName": "", + "propertyName": "zeppelin.R.shiny.iframe_height", + "defaultValue": "500px", + "description": "", + "type": "text" + } + }, + "editor": { + "language": "r", + "editOnDblClick": false, + "completionSupport": true } } ] diff --git a/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java new file mode 100644 index 0000000..5939f99 --- /dev/null +++ b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import com.mashape.unirest.http.HttpResponse; +import com.mashape.unirest.http.Unirest; +import com.mashape.unirest.http.exceptions.UnirestException; +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ShinyInterpreterTest { + + private ShinyInterpreter interpreter; + + @Before + public void setUp() throws InterpreterException { + Properties properties = new Properties(); + + InterpreterContext context = getInterpreterContext(); + InterpreterContext.set(context); + interpreter = new ShinyInterpreter(properties); + + InterpreterGroup interpreterGroup = new InterpreterGroup(); + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), "session_1"); + interpreter.setInterpreterGroup(interpreterGroup); + + interpreter.open(); + } + + @After + public void tearDown() throws InterpreterException { + if (interpreter != null) { + interpreter.close(); + } + } + + @Test + public void testShinyApp() throws + IOException, InterpreterException, InterruptedException, UnirestException { + /****************** Launch Shiny app with default app name *****************************/ + InterpreterContext context = getInterpreterContext(); + context.getLocalProperties().put("type", "ui"); + InterpreterResult result = + interpreter.interpret(IOUtils.toString(getClass().getResource("/ui.R")), context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = getInterpreterContext(); + context.getLocalProperties().put("type", "server"); + result = interpreter.interpret(IOUtils.toString(getClass().getResource("/server.R")), context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + final InterpreterContext context2 = getInterpreterContext(); + context2.getLocalProperties().put("type", "run"); + Thread thread = new Thread(() -> { + try { + interpreter.interpret("", context2); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + // wait for the shiny app start + Thread.sleep(5 * 1000); + // extract shiny url + List<InterpreterResultMessage> resultMessages = context2.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType()); + String resultMessageData = resultMessages.get(0).getData(); + assertTrue(resultMessageData, resultMessageData.contains("<iframe")); + Pattern urlPattern = Pattern.compile(".*src=\"(http\\S*)\".*", Pattern.DOTALL); + Matcher matcher = urlPattern.matcher(resultMessageData); + if (!matcher.matches()) { + fail("Unable to extract url: " + resultMessageData); + } + String shinyURL = matcher.group(1); + + // verify shiny app via calling its rest api + HttpResponse<String> response = Unirest.get(shinyURL).asString(); + assertEquals(200, response.getStatus()); + assertTrue(response.getBody(), response.getBody().contains("Shiny Text")); + + /************************ Launch another shiny app (app2) *****************************/ + context = getInterpreterContext(); + context.getLocalProperties().put("type", "ui"); + context.getLocalProperties().put("app", "app2"); + result = + interpreter.interpret(IOUtils.toString(getClass().getResource("/ui.R")), context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = getInterpreterContext(); + context.getLocalProperties().put("type", "server"); + context.getLocalProperties().put("app", "app2"); + result = interpreter.interpret(IOUtils.toString(getClass().getResource("/server.R")), context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + final InterpreterContext context3 = getInterpreterContext(); + context3.getLocalProperties().put("type", "run"); + context3.getLocalProperties().put("app", "app2"); + thread = new Thread(() -> { + try { + interpreter.interpret("", context3); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + // wait for the shiny app start + Thread.sleep(5 * 1000); + // extract shiny url + resultMessages = context3.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType()); + resultMessageData = resultMessages.get(0).getData(); + assertTrue(resultMessageData, resultMessageData.contains("<iframe")); + matcher = urlPattern.matcher(resultMessageData); + if (!matcher.matches()) { + fail("Unable to extract url: " + resultMessageData); + } + String shinyURL2 = matcher.group(1); + + // verify shiny app via calling its rest api + response = Unirest.get(shinyURL2).asString(); + assertEquals(200, response.getStatus()); + assertTrue(response.getBody(), response.getBody().contains("Shiny Text")); + + // cancel paragraph to stop the first shiny app + interpreter.cancel(getInterpreterContext()); + // wait for shiny app to be stopped + Thread.sleep(1000); + try { + Unirest.get(shinyURL).asString(); + fail("Should fail to connect to shiny app"); + } catch (Exception e) { + assertTrue(e.getMessage(), e.getMessage().contains("Connection refused")); + } + + // the second shiny app still works + response = Unirest.get(shinyURL2).asString(); + assertEquals(200, response.getStatus()); + assertTrue(response.getBody(), response.getBody().contains("Shiny Text")); + } + + @Test + public void testInvalidShinyApp() + throws IOException, InterpreterException, InterruptedException, UnirestException { + InterpreterContext context = getInterpreterContext(); + context.getLocalProperties().put("type", "ui"); + InterpreterResult result = + interpreter.interpret(IOUtils.toString(getClass().getResource("/invalid_ui.R")), context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = getInterpreterContext(); + context.getLocalProperties().put("type", "server"); + result = interpreter.interpret(IOUtils.toString(getClass().getResource("/server.R")), context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + final InterpreterContext context2 = getInterpreterContext(); + context2.getLocalProperties().put("type", "run"); + Thread thread = new Thread(() -> { + try { + interpreter.interpret("", context2); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + // wait for the shiny app start + Thread.sleep(5 * 1000); + List<InterpreterResultMessage> resultMessages = context2.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType()); + String resultMessageData = resultMessages.get(0).getData(); + assertTrue(resultMessageData, resultMessageData.contains("<iframe")); + Pattern urlPattern = Pattern.compile(".*src=\"(http\\S*)\".*", Pattern.DOTALL); + Matcher matcher = urlPattern.matcher(resultMessageData); + if (!matcher.matches()) { + fail("Unable to extract url: " + resultMessageData); + } + String shinyURL = matcher.group(1); + + // call shiny app via rest api + HttpResponse<String> response = Unirest.get(shinyURL).asString(); + assertEquals(500, response.getStatus()); + + resultMessages = context2.out.toInterpreterResultMessage(); + assertTrue(resultMessages.get(1).getData(), + resultMessages.get(1).getData().contains("object 'Invalid_code' not found")); + + // cancel paragraph to stop shiny app + interpreter.cancel(getInterpreterContext()); + // wait for shiny app to be stopped + Thread.sleep(1000); + try { + Unirest.get(shinyURL).asString(); + fail("Should fail to connect to shiny app"); + } catch (Exception e) { + assertTrue(e.getMessage(), e.getMessage().contains("Connection refused")); + } + } + + protected InterpreterContext getInterpreterContext() { + InterpreterContext context = InterpreterContext.builder() + .setNoteId("note_1") + .setParagraphId("paragraph_1") + .setInterpreterOut(new InterpreterOutput(null)) + .setLocalProperties(new HashMap<>()) + .setInterpreterClassName(ShinyInterpreter.class.getName()) + .build(); + return context; + } +} diff --git a/rlang/src/test/resources/invalid_ui.R b/rlang/src/test/resources/invalid_ui.R new file mode 100644 index 0000000..9f08258 --- /dev/null +++ b/rlang/src/test/resources/invalid_ui.R @@ -0,0 +1 @@ +Invalid_code \ No newline at end of file diff --git a/rlang/src/test/resources/log4j.properties b/rlang/src/test/resources/log4j.properties index 0d6d5f1..4836e66 100644 --- a/rlang/src/test/resources/log4j.properties +++ b/rlang/src/test/resources/log4j.properties @@ -24,3 +24,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n log4j.logger.org.apache.zeppelin.interpreter.util=DEBUG +log4j.logger.org.apache.zeppelin.jupyter=DEBUG diff --git a/rlang/src/test/resources/server.R b/rlang/src/test/resources/server.R new file mode 100644 index 0000000..eb67ffb --- /dev/null +++ b/rlang/src/test/resources/server.R @@ -0,0 +1,23 @@ +# Define server logic to summarize and view selected dataset ---- +server <- function(input, output) { + + # Return the requested dataset ---- + datasetInput <- reactive({ + switch(input$dataset, + "rock" = rock, + "pressure" = pressure, + "cars" = cars) + }) + + # Generate a summary of the dataset ---- + output$summary <- renderPrint({ + dataset <- datasetInput() + summary(dataset) + }) + + # Show the first "n" observations ---- + output$view <- renderTable({ + head(datasetInput(), n = input$obs) + }) + +} \ No newline at end of file diff --git a/rlang/src/test/resources/ui.R b/rlang/src/test/resources/ui.R new file mode 100644 index 0000000..282a9d5 --- /dev/null +++ b/rlang/src/test/resources/ui.R @@ -0,0 +1,35 @@ +# Define UI for dataset viewer app ---- +ui <- fluidPage( + +# App title ---- +titlePanel("Shiny Text"), + +# Sidebar layout with a input and output definitions ---- +sidebarLayout( + +# Sidebar panel for inputs ---- +sidebarPanel( + +# Input: Selector for choosing dataset ---- +selectInput(inputId = "dataset", +label = "Choose a dataset:", +choices = c("rock", "pressure", "cars")), + +# Input: Numeric entry for number of obs to view ---- +numericInput(inputId = "obs", +label = "Number of observations to view:", +value = 10) +), + +# Main panel for displaying outputs ---- +mainPanel( + +# Output: Verbatim text for data summary ---- +verbatimTextOutput("summary"), + +# Output: HTML table with requested number of observations ---- +tableOutput("view") + +) +) +) \ No newline at end of file diff --git a/testing/install_external_dependencies.sh b/testing/install_external_dependencies.sh index 8102edc..492f37e 100755 --- a/testing/install_external_dependencies.sh +++ b/testing/install_external_dependencies.sh @@ -67,5 +67,6 @@ if [[ "$R" == "true" ]] ; then R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 R -e "install.packages('IRkernel', repos = 'http://cran.us.r-project.org', lib='~/R');IRkernel::installspec()" > /dev/null 2>&1 + R -e "install.packages('shiny', repos = 'http://cran.us.r-project.org', lib='~/R');IRkernel::installspec()" > /dev/null 2>&1 fi diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java index d544211..3d37c65 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java @@ -26,6 +26,7 @@ import org.apache.commons.exec.LogOutputStream; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.InterpreterContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,20 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { this.processOutput = processLogOutput; } + /** + * In some cases we need to redirect process output to paragraph's InterpreterOutput. + * e.g. In %r.shiny for shiny app + * @param redirectedContext + */ + public void setRedirectedContext(InterpreterContext redirectedContext) { + if (redirectedContext != null) { + LOGGER.info("Start to redirect process output to interpreter output"); + } else { + LOGGER.info("Stop to redirect process output to interpreter output"); + } + this.processOutput.redirectedContext = redirectedContext; + } + public void launch() { DefaultExecutor executor = new DefaultExecutor(); executor.setStreamHandler(new PumpStreamHandler(processOutput)); @@ -152,6 +167,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { private boolean catchLaunchOutput = true; private StringBuilder launchOutput = new StringBuilder(); + private InterpreterContext redirectedContext; public void stopCatchLaunchOutput() { this.catchLaunchOutput = false; @@ -172,6 +188,13 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { if (catchLaunchOutput) { launchOutput.append(s + "\n"); } + if (redirectedContext != null) { + try { + redirectedContext.out.write(s + "\n"); + } catch (IOException e) { + e.printStackTrace(); + } + } } } } diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java index 8b64a9b..54112d4 100644 --- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java +++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java @@ -21,6 +21,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.jupyter.proto.JupyterKernelGrpc; import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest; @@ -40,8 +41,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.SecureRandom; import java.util.Iterator; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Grpc client for Jupyter kernel @@ -49,34 +53,78 @@ import java.util.concurrent.atomic.AtomicBoolean; public class JupyterKernelClient { private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelClient.class.getName()); + // used for matching shiny url + private static Pattern ShinyListeningPattern = + Pattern.compile(".*Listening on (http:\\S*).*", Pattern.DOTALL); private final ManagedChannel channel; private final JupyterKernelGrpc.JupyterKernelBlockingStub blockingStub; private final JupyterKernelGrpc.JupyterKernelStub asyncStub; private volatile boolean maybeKernelFailed = false; + private Properties properties; + private InterpreterContext context; private SecureRandom random = new SecureRandom(); /** * Construct client for accessing RouteGuide server at {@code host:port}. */ - public JupyterKernelClient(String host, int port) { - this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true)); + public JupyterKernelClient(String host, + int port) { + this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true), new Properties()); } /** * Construct client for accessing RouteGuide server using the existing channel. */ - public JupyterKernelClient(ManagedChannelBuilder<?> channelBuilder) { + public JupyterKernelClient(ManagedChannelBuilder<?> channelBuilder, Properties properties) { channel = channelBuilder.build(); blockingStub = JupyterKernelGrpc.newBlockingStub(channel); asyncStub = JupyterKernelGrpc.newStub(channel); + this.properties = properties; } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } + /** + * set current InterpreterContext. + * @param context + */ + public void setInterpreterContext(InterpreterContext context) { + this.context = context; + } + + /** + * This is for shiny interpreter. It's better not to put this in the general + * JupyterKernelClient, we may need to create a specififc JupyterKernelClient for R Kernel. + * @param response + * @return true if shiny url is matched + * @throws IOException + */ + private boolean checkForShinyApp(String response) throws IOException { + if (context.getInterpreterClassName() != null && + context.getInterpreterClassName().equals("org.apache.zeppelin.r.ShinyInterpreter")) { + Matcher matcher = ShinyListeningPattern.matcher(response); + if (matcher.matches()) { + String url = matcher.group(1); + LOGGER.info("Matching shiny app url: " + url); + context.out.clear(); + String defaultHeight = properties.getProperty("zeppelin.R.shiny.iframe_height", "500px"); + String height = context.getLocalProperties().getOrDefault("height", defaultHeight); + String defaultWidth = properties.getProperty("zeppelin.R.shiny.iframe_width", "100%"); + String width = context.getLocalProperties().getOrDefault("width", defaultWidth); + context.out.write("\n%html " + "<iframe src=\"" + url + "\" height =\"" + + height + "\" width=\"" + width + "\" frameBorder=\"0\"></iframe>"); + context.out.flush(); + context.out.write("\n%text "); + return true; + } + } + return false; + } + // execute the code and make the output as streaming by writing it to InterpreterOutputStream // one by one. public ExecuteResponse stream_execute(ExecuteRequest request, @@ -96,6 +144,9 @@ public class JupyterKernelClient { switch (executeResponse.getType()) { case TEXT: try { + if (checkForShinyApp(executeResponse.getOutput())) { + break; + } if (executeResponse.getOutput().startsWith("%")) { // the output from jupyter kernel maybe specify format already. interpreterOutput.write((executeResponse.getOutput()).getBytes()); @@ -239,6 +290,5 @@ public class JupyterKernelClient { ExecuteResponse response = client.block_execute(ExecuteRequest.newBuilder(). setCode("abcd=2").build()); System.out.println(response.getOutput()); - } } diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java index 3a15337..d1dc351 100644 --- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java +++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java @@ -123,10 +123,10 @@ public class JupyterKernelInterpreter extends AbstractInterpreter { int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); int message_size = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size", 32 * 1024 * 1024 + "")); - jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1", - kernelPort) - .usePlaintext(true).maxInboundMessageSize(message_size)); + jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1", + kernelPort).usePlaintext(true).maxInboundMessageSize(message_size), + getProperties()); launchJupyterKernel(kernelPort); } catch (Exception e) { throw new InterpreterException("Fail to open JupyterKernelInterpreter:\n" + @@ -221,7 +221,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter { @Override public void close() throws InterpreterException { if (jupyterKernelProcessLauncher != null) { - LOGGER.info("Killing Jupyter Kernel Process"); + LOGGER.info("Shutdown Jupyter Kernel Process"); if (jupyterKernelProcessLauncher.isRunning()) { jupyterKernelClient.stop(StopRequest.newBuilder().build()); try { @@ -243,6 +243,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter { z.setNoteGui(context.getNoteGui()); z.setInterpreterContext(context); interpreterOutput.setInterpreterOutput(context.out); + jupyterKernelClient.setInterpreterContext(context); try { ExecuteResponse response = jupyterKernelClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(), diff --git a/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py b/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py index 5708c71..ac11ea3 100644 --- a/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py +++ b/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py @@ -51,22 +51,21 @@ class KernelServer(kernel_pb2_grpc.JupyterKernelServicer): self._status = kernel_pb2.RUNNING def execute(self, request, context): - print("execute code:\n") - print(request.code.encode('utf-8')) + # print("execute code:\n") + # print(request.code.encode('utf-8')) sys.stdout.flush() stream_reply_queue = queue.Queue(maxsize = 30) payload_reply = [] def _output_hook(msg): msg_type = msg['header']['msg_type'] content = msg['content'] - print("******************* CONTENT ******************") outStatus, outType, output = kernel_pb2.SUCCESS, None, None # prepare the reply if msg_type == 'stream': outType = kernel_pb2.TEXT output = content['text'] elif msg_type in ('display_data', 'execute_result'): - print(content['data']) + # print(content['data']) # The if-else order matters, can not be changed. Because ipython may provide multiple output. # TEXT is the last resort type. if 'text/html' in content['data']: diff --git a/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IPythonKernelTest.java b/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IPythonKernelTest.java index 59b957f..ed53169 100644 --- a/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IPythonKernelTest.java +++ b/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IPythonKernelTest.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.resource.ResourcePool; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -68,6 +69,13 @@ public class IPythonKernelTest { interpreter.open(); } + @After + public void tearDown() throws InterpreterException { + if (interpreter != null) { + interpreter.close(); + } + } + @Test public void testPythonBasics() throws InterpreterException, InterruptedException, IOException { InterpreterContext context = getInterpreterContext(); diff --git a/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IRKernelTest.java b/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IRKernelTest.java index 246400f..002e97b 100644 --- a/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IRKernelTest.java +++ b/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IRKernelTest.java @@ -64,7 +64,9 @@ public class IRKernelTest { @After public void tearDown() throws InterpreterException { - interpreter.close(); + if (interpreter != null) { + interpreter.close(); + } } @Test diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 09838f4..5405d9e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -396,7 +396,9 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen } } - return Strings.isNullOrEmpty(scriptText); + // don't skip paragraph when local properties is not empty. + // local properties can customize the behavior of interpreter. e.g. %r.shiny(type=run) + return Strings.isNullOrEmpty(scriptText) && localProperties.isEmpty(); } public boolean execute(boolean blocking) {