This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new b0e45ef  [ZEPPELIN-4443]. Unclear message when R is not installed
b0e45ef is described below

commit b0e45ef6ab744e3c82728b2240fc0106dad3e0e4
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Wed Nov 13 13:01:41 2019 +0800

    [ZEPPELIN-4443]. Unclear message when R is not installed
    
    ### What is this PR for?
    
    This PR utilize ProcessLauncher to launch R process and capture the error 
output when it fails to launch R process.
    
    ### What type of PR is it?
    [Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4443
    
    ### How should this be tested?
    * Unit test is added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3520 from zjffdu/ZEPPELIN-4443 and squashes the following commits:
    
    79e51cfee [Jeff Zhang] [ZEPPELIN-4443]. Unclear message when R is not 
installed
---
 .../java/org/apache/zeppelin/spark/ZeppelinR.java  | 203 ++++++++++-----------
 .../zeppelin/spark/SparkRInterpreterTest.java      |  35 +++-
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala |   2 +-
 .../zeppelin/interpreter/util/ProcessLauncher.java |  12 +-
 4 files changed, 140 insertions(+), 112 deletions(-)

diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index 60c5b17..00226e9 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -22,9 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.spark.SparkRBackend;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.interpreter.util.ProcessLauncher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,30 +34,19 @@ import java.util.Map;
 /**
  * R repl interaction
  */
-public class ZeppelinR implements ExecuteResultHandler {
-  private static Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
+public class ZeppelinR {
+  private static Logger LOGGER = LoggerFactory.getLogger(ZeppelinR.class);
 
   private final SparkRInterpreter sparkRInterpreter;
   private final String rCmdPath;
   private final SparkVersion sparkVersion;
   private final int timeout;
-  private DefaultExecutor executor;
-  private InterpreterOutputStream outputStream;
-  private PipedOutputStream input;
+  private RProcessLogOutputStream processOutputStream;
   private final String scriptPath;
   private final String libPath;
-  static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(
-      new HashMap<Integer, ZeppelinR>());
-
-  private InterpreterOutput initialOutput;
+  static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(new 
HashMap());
   private final int port;
-  private boolean rScriptRunning;
-
-  /**
-   * To be notified R repl initialization
-   */
-  boolean rScriptInitialized = false;
-  Integer rScriptInitializeNotifier = new Integer(0);
+  private RProcessLauncher rProcessLauncher;
 
   /**
    * Request to R repl
@@ -67,6 +54,10 @@ public class ZeppelinR implements ExecuteResultHandler {
   Request rRequestObject = null;
   Integer rRequestNotifier = new Integer(0);
 
+  public void setInterpreterOutput(InterpreterOutput out) {
+    processOutputStream.setInterpreterOutput(out);
+  }
+
   /**
    * Request object
    *
@@ -151,25 +142,22 @@ public class ZeppelinR implements ExecuteResultHandler {
       cmd.addArgument(SparkRBackend.socketSecret());
     }
     // dump out the R command to facilitate manually running it, e.g. for 
fault diagnosis purposes
-    logger.debug("R Command: " + cmd.toString());
-
-    executor = new DefaultExecutor();
-    outputStream = new SparkRInterpreterOutputStream(logger, 
sparkRInterpreter);
-
-    input = new PipedOutputStream();
-    PipedInputStream in = new PipedInputStream(input);
-
-    PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, 
outputStream, in);
-    executor.setWatchdog(new 
ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
-    executor.setStreamHandler(streamHandler);
+    LOGGER.info("R Command: " + cmd.toString());
+    processOutputStream = new RProcessLogOutputStream(sparkRInterpreter);
     Map env = EnvironmentUtils.getProcEnvironment();
-
-
-    initialOutput = new InterpreterOutput(null);
-    outputStream.setInterpreterOutput(initialOutput);
-    executor.execute(cmd, env, this);
-    rScriptRunning = true;
-
+    rProcessLauncher = new RProcessLauncher(cmd, env, processOutputStream);
+    rProcessLauncher.launch();
+    rProcessLauncher.waitForReady(30 * 1000);
+
+    if (!rProcessLauncher.isRunning()) {
+      if (rProcessLauncher.isLaunchTimeout()) {
+        throw new IOException("Launch r process is time out.\n" +
+                rProcessLauncher.getErrorMessage());
+      } else {
+        throw new IOException("Fail to launch r process.\n" +
+                rProcessLauncher.getErrorMessage());
+      }
+    }
     // flush output
     eval("cat('')");
   }
@@ -222,33 +210,31 @@ public class ZeppelinR implements ExecuteResultHandler {
     }
   }
 
+  private boolean isRProcessInitialized() {
+    return rProcessLauncher != null && rProcessLauncher.isRunning();
+  }
+
   /**
    * Send request to r repl and return response
    * @return responseValue
    */
-  private Object request() throws RuntimeException, InterpreterException {
-    if (!rScriptRunning) {
+  private Object request() throws RuntimeException {
+    if (!isRProcessInitialized()) {
       throw new RuntimeException("r repl is not running");
     }
 
-    // wait for rscript initialized
-    if (!rScriptInitialized) {
-      waitForRScriptInitialized();
-    }
-
     rResponseValue = null;
-
     synchronized (rRequestNotifier) {
       rRequestNotifier.notify();
     }
 
     Object respValue = null;
     synchronized (rResponseNotifier) {
-      while (rResponseValue == null && rScriptRunning) {
+      while (rResponseValue == null && isRProcessInitialized()) {
         try {
           rResponseNotifier.wait(1000);
         } catch (InterruptedException e) {
-          logger.error(e.getMessage(), e);
+          LOGGER.error(e.getMessage(), e);
         }
       }
       respValue = rResponseValue;
@@ -263,39 +249,6 @@ public class ZeppelinR implements ExecuteResultHandler {
   }
 
   /**
-   * Wait until src/main/resources/R/zeppelin_sparkr.R is initialized
-   * and call onScriptInitialized()
-   *
-   * @throws InterpreterException
-   */
-  private void waitForRScriptInitialized() throws InterpreterException {
-    synchronized (rScriptInitializeNotifier) {
-      long startTime = System.nanoTime();
-      while (rScriptInitialized == false &&
-          rScriptRunning &&
-          System.nanoTime() - startTime < 10L * 1000 * 1000000) {
-        try {
-          rScriptInitializeNotifier.wait(1000);
-        } catch (InterruptedException e) {
-          logger.error(e.getMessage(), e);
-        }
-      }
-    }
-
-    String errorMessage = "";
-    try {
-      initialOutput.flush();
-      errorMessage = new String(initialOutput.toByteArray());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    if (rScriptInitialized == false) {
-      throw new InterpreterException("sparkr is not responding " + 
errorMessage);
-    }
-  }
-
-  /**
    * invoked by src/main/resources/R/zeppelin_sparkr.R
    * @return
    */
@@ -305,7 +258,7 @@ public class ZeppelinR implements ExecuteResultHandler {
         try {
           rRequestNotifier.wait(1000);
         } catch (InterruptedException e) {
-          logger.error(e.getMessage(), e);
+          LOGGER.error(e.getMessage(), e);
         }
       }
 
@@ -332,10 +285,7 @@ public class ZeppelinR implements ExecuteResultHandler {
    * invoked by src/main/resources/R/zeppelin_sparkr.R
    */
   public void onScriptInitialized() {
-    synchronized (rScriptInitializeNotifier) {
-      rScriptInitialized = true;
-      rScriptInitializeNotifier.notifyAll();
-    }
+    rProcessLauncher.initialized();
   }
 
   /**
@@ -359,14 +309,16 @@ public class ZeppelinR implements ExecuteResultHandler {
       throw new InterpreterException(e);
     }
 
-    logger.info("File {} created", scriptPath);
+    LOGGER.info("File {} created", scriptPath);
   }
 
   /**
    * Terminate this R repl
    */
   public void close() {
-    executor.getWatchdog().destroyProcess();
+    if (rProcessLauncher != null) {
+      rProcessLauncher.stop();
+    }
     new File(scriptPath).delete();
     zeppelinR.remove(hashCode());
   }
@@ -381,36 +333,58 @@ public class ZeppelinR implements ExecuteResultHandler {
     return zeppelinR.get(hashcode);
   }
 
-  /**
-   * Pass InterpreterOutput to capture the repl output
-   * @param out
-   */
-  public void setInterpreterOutput(InterpreterOutput out) {
-    outputStream.setInterpreterOutput(out);
-  }
+  class RProcessLauncher extends ProcessLauncher {
 
-  @Override
-  public void onProcessComplete(int i) {
-    logger.info("process complete {}", i);
-    rScriptRunning = false;
-  }
+    public RProcessLauncher(CommandLine commandLine,
+                           Map<String, String> envs,
+                           ProcessLogOutputStream processLogOutput) {
+      super(commandLine, envs, processLogOutput);
+    }
 
-  @Override
-  public void onProcessFailed(ExecuteException e) {
-    logger.error(e.getMessage(), e);
-    rScriptRunning = false;
-  }
+    @Override
+    public void waitForReady(int timeout) {
+      long startTime = System.currentTimeMillis();
+      synchronized (this) {
+        while (state == State.LAUNCHED) {
+          LOGGER.info("Waiting for R process initialized");
+          try {
+            wait(100);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+          if ((System.currentTimeMillis() - startTime) > timeout) {
+            onTimeout();
+            break;
+          }
+        }
+      }
+    }
 
+    public void initialized() {
+      synchronized (this) {
+        this.state = State.RUNNING;
+        notify();
+      }
+    }
+  }
 
-  public static class SparkRInterpreterOutputStream extends 
InterpreterOutputStream {
+  public static class RProcessLogOutputStream extends 
ProcessLauncher.ProcessLogOutputStream {
 
+    private InterpreterOutput interpreterOutput;
     private SparkRInterpreter sparkRInterpreter;
 
-    public SparkRInterpreterOutputStream(Logger logger, SparkRInterpreter 
sparkRInterpreter) {
-      super(logger);
+    public RProcessLogOutputStream(SparkRInterpreter sparkRInterpreter) {
       this.sparkRInterpreter = sparkRInterpreter;
     }
 
+    /**
+     * Redirect r process output to interpreter output.
+     * @param interpreterOutput
+     */
+    public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
+      this.interpreterOutput = interpreterOutput;
+    }
+
     @Override
     protected void processLine(String s, int i) {
       super.processLine(s, i);
@@ -418,6 +392,21 @@ public class ZeppelinR implements ExecuteResultHandler {
           || s.contains("Execution halted")) { // spark 1.x
         sparkRInterpreter.getRbackendDead().set(true);
       }
+      if (interpreterOutput != null) {
+        try {
+          interpreterOutput.write(s);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      if (interpreterOutput != null) {
+        interpreterOutput.close();
+      }
     }
   }
 }
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 3838053..1aaef8a 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.zeppelin.spark;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -24,11 +26,13 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.apache.zeppelin.python.PythonInterpreter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Properties;
 
@@ -78,8 +82,6 @@ public class SparkRInterpreterTest {
 
   @Test
   public void testSparkRInterpreter() throws InterpreterException, 
InterruptedException {
-
-
     InterpreterResult result = sparkRInterpreter.interpret("1+1", 
getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertTrue(result.message().get(0).getData().contains("2"));
@@ -150,6 +152,35 @@ public class SparkRInterpreterTest {
     assertTrue(result.message().get(0).getData().contains("sparkR backend is 
dead"));
   }
 
+  @Test
+  public void testInvalidR() throws InterpreterException {
+    tearDown();
+
+    Properties properties = new Properties();
+    properties.setProperty("zeppelin.R.cmd", "invalid_r");
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    
+    InterpreterGroup interpreterGroup = new InterpreterGroup();
+    Interpreter sparkRInterpreter = new LazyOpenInterpreter(new 
SparkRInterpreter(properties));
+    Interpreter sparkInterpreter = new LazyOpenInterpreter(new 
SparkInterpreter(properties));
+    interpreterGroup.addInterpreterToSession(sparkRInterpreter, "session_1");
+    interpreterGroup.addInterpreterToSession(sparkInterpreter, "session_1");
+    sparkRInterpreter.setInterpreterGroup(interpreterGroup);
+    sparkInterpreter.setInterpreterGroup(interpreterGroup);
+
+    InterpreterContext context = getInterpreterContext();
+    InterpreterContext.set(context);
+
+    try {
+      sparkRInterpreter.interpret("1+1", getInterpreterContext());
+      fail("Should fail to open SparkRInterpreter");
+    } catch (InterpreterException e) {
+      String stacktrace = ExceptionUtils.getStackTrace(e);
+      assertTrue(stacktrace, stacktrace.contains("No such file or directory"));
+    }
+  }
+
   private InterpreterContext getInterpreterContext() {
     InterpreterContext context = InterpreterContext.builder()
         .setNoteId("note_1")
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 11ae461..8750011 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -321,7 +321,7 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
 
     z = new SparkZeppelinContext(sc, sparkShims,
       interpreterGroup.getInterpreterHookRegistry,
-      properties.getProperty("zeppelin.spark.maxResult").toInt)
+      properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
     bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
   }
 
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
index 5e6c8c9..d544211 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
@@ -59,11 +59,19 @@ public abstract class ProcessLauncher implements 
ExecuteResultHandler {
                          Map<String, String> envs) {
     this.commandLine = commandLine;
     this.envs = envs;
+    this.processOutput = new ProcessLogOutputStream();
+  }
+
+  public ProcessLauncher(CommandLine commandLine,
+                         Map<String, String> envs,
+                         ProcessLogOutputStream processLogOutput) {
+    this.commandLine = commandLine;
+    this.envs = envs;
+    this.processOutput = processLogOutput;
   }
 
   public void launch() {
     DefaultExecutor executor = new DefaultExecutor();
-    this.processOutput = new ProcessLogOutputStream();
     executor.setStreamHandler(new PumpStreamHandler(processOutput));
     this.watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
     executor.setWatchdog(watchdog);
@@ -140,7 +148,7 @@ public abstract class ProcessLauncher implements 
ExecuteResultHandler {
     processOutput.stopCatchLaunchOutput();
   }
 
-  class ProcessLogOutputStream extends LogOutputStream {
+  public static class ProcessLogOutputStream extends LogOutputStream {
 
     private boolean catchLaunchOutput = true;
     private StringBuilder launchOutput = new StringBuilder();

Reply via email to