Repository: incubator-zeppelin Updated Branches: refs/heads/master 182908792 -> 70a0f5c66
Fixes RemoteInterpreterProcessTest. There is a testClientFactory(), which assert depends on coincidence and performance of particular machine. The fix is to provide an ability to turn off poller, which interfere with the process during testing. Author: Eugene Morozov <[email protected]> Closes #40 from fathersson/RemoteInterpreterProcessTest and squashes the following commits: 0e5a2f4 [Eugene Morozov] Fixes RemoteInterpreterProcessTest. Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/70a0f5c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/70a0f5c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/70a0f5c6 Branch: refs/heads/master Commit: 70a0f5c66e2021fbf7f8d7aef281e3b956a2bad5 Parents: 1829087 Author: Eugene Morozov <[email protected]> Authored: Thu Apr 16 03:06:15 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Tue Apr 21 08:25:11 2015 +0900 ---------------------------------------------------------------------- zeppelin-interpreter/pom.xml | 13 ++++-- .../remote/RemoteInterpreterEventPoller.java | 48 +++++++++++--------- .../remote/RemoteInterpreterProcess.java | 44 ++++++++++-------- .../remote/RemoteInterpreterProcessTest.java | 4 +- 4 files changed, 64 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/70a0f5c6/zeppelin-interpreter/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index d824dfe..32d6765 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -16,7 +16,8 @@ ~ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -40,7 +41,7 @@ <artifactId>libthrift</artifactId> <version>0.9.0</version> </dependency> - + <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> @@ -57,7 +58,7 @@ <artifactId>commons-pool2</artifactId> <version>2.3</version> </dependency> - + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> @@ -73,5 +74,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/70a0f5c6/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index dc9ef0b..4997b0e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; +import com.google.gson.Gson; import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; @@ -28,40 +29,39 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; - /** * */ public class RemoteInterpreterEventPoller extends Thread { - Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + + private volatile boolean shutdown; + private RemoteInterpreterProcess interpreterProcess; - boolean shutdown; private InterpreterGroup interpreterGroup; - public RemoteInterpreterEventPoller( - InterpreterGroup interpreterGroup, - RemoteInterpreterProcess interpreterProcess) { - this.interpreterGroup = interpreterGroup; - this.interpreterProcess = interpreterProcess; + public RemoteInterpreterEventPoller() { shutdown = false; } + public void setInterpreterProcess(RemoteInterpreterProcess interpreterProcess) { + this.interpreterProcess = interpreterProcess; + } + + public void setInterpreterGroup(InterpreterGroup interpreterGroup) { + this.interpreterGroup = interpreterGroup; + } + @Override public void run() { Client client = null; - while (shutdown == false) { + while (!shutdown) { try { client = interpreterProcess.getClient(); } catch (Exception e1) { logger.error("Can't get RemoteInterpreterEvent", e1); - try { - synchronized (this) { - wait(1000); - } - } catch (InterruptedException e) { - } + waitQuietly(); continue; } @@ -70,12 +70,7 @@ public class RemoteInterpreterEventPoller extends Thread { event = client.getEvent(); } catch (TException e) { logger.error("Can't get RemoteInterpreterEvent", e); - try { - synchronized (this) { - wait(1000); - } - } catch (InterruptedException e1) { - } + waitQuietly(); continue; } @@ -117,6 +112,15 @@ public class RemoteInterpreterEventPoller extends Thread { } } + private void waitQuietly() { + try { + synchronized (this) { + wait(1000); + } + } catch (InterruptedException ignored) { + } + } + public void shutdown() { shutdown = true; synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/70a0f5c6/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index dbfaa35..61fcb70 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -17,15 +17,8 @@ package org.apache.zeppelin.interpreter.remote; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.DefaultExecutor; -import org.apache.commons.exec.ExecuteException; -import org.apache.commons.exec.ExecuteResultHandler; -import org.apache.commons.exec.ExecuteWatchdog; +import com.google.gson.Gson; +import org.apache.commons.exec.*; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.thrift.TException; @@ -35,37 +28,51 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * */ public class RemoteInterpreterProcess implements ExecuteResultHandler { - Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); - AtomicInteger referenceCount; + private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); + + private final AtomicInteger referenceCount; private DefaultExecutor executor; private ExecuteWatchdog watchdog; boolean running = false; - int port = -1; - private String interpreterRunner; - private String interpreterDir; + private int port = -1; + private final String interpreterRunner; + private final String interpreterDir; private GenericObjectPool<Client> clientPool; private Map<String, String> env; - private RemoteInterpreterEventPoller remoteInterpreterEventPoller; - private InterpreterContextRunnerPool interpreterContextRunnerPool; + private final RemoteInterpreterEventPoller remoteInterpreterEventPoller; + private final InterpreterContextRunnerPool interpreterContextRunnerPool; public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env, InterpreterContextRunnerPool interpreterContextRunnerPool) { + this(intpRunner, intpDir, env, interpreterContextRunnerPool, + new RemoteInterpreterEventPoller()); + } + + RemoteInterpreterProcess(String intpRunner, + String intpDir, + Map<String, String> env, + InterpreterContextRunnerPool interpreterContextRunnerPool, + RemoteInterpreterEventPoller remoteInterpreterEventPoller) { this.interpreterRunner = intpRunner; this.interpreterDir = intpDir; this.env = env; this.interpreterContextRunnerPool = interpreterContextRunnerPool; referenceCount = new AtomicInteger(0); + this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; } + public int getPort() { return port; } @@ -119,7 +126,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port)); - remoteInterpreterEventPoller = new RemoteInterpreterEventPoller(interpreterGroup, this); + remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup); + remoteInterpreterEventPoller.setInterpreterProcess(this); remoteInterpreterEventPoller.start(); } return referenceCount.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/70a0f5c6/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index fcd6847..4ea9a30 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.*; import java.util.HashMap; @@ -48,7 +49,7 @@ public class RemoteInterpreterProcessTest { public void testClientFactory() throws Exception { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(), - new InterpreterContextRunnerPool()); + new InterpreterContextRunnerPool(), mock(RemoteInterpreterEventPoller.class)); rip.reference(intpGroup); assertEquals(0, rip.getNumActiveClient()); assertEquals(0, rip.getNumIdleClient()); @@ -63,5 +64,4 @@ public class RemoteInterpreterProcessTest { rip.dereference(); } - }
