Fix an easy to fail unit-test - The test has race condition. The mutual discovery doesnât works well as one runnable can be finished and not discoverable anymore before another one tries to discover. - Switch to use an Echo server / client runnable, which only the client needs to discover the server.
This closes #77 on GitHub. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/900e3829 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/900e3829 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/900e3829 Branch: refs/heads/site Commit: 900e382938090fb4585389cfa7d0acf020f088e8 Parents: 323e5af Author: Terence Yim <[email protected]> Authored: Tue Jan 5 23:46:06 2016 -0800 Committer: Terence Yim <[email protected]> Committed: Wed Jan 6 22:16:31 2016 -0800 ---------------------------------------------------------------------- .../twill/yarn/ServiceDiscoveryTestRun.java | 91 +++++++++++--------- .../org/apache/twill/yarn/SocketServer.java | 13 +-- 2 files changed, 56 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/900e3829/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java index 59fe835..77f53ad 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java @@ -17,8 +17,10 @@ */ package org.apache.twill.yarn; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.io.LineReader; import org.apache.twill.api.AbstractTwillRunnable; -import org.apache.twill.api.Command; import org.apache.twill.api.TwillApplication; import org.apache.twill.api.TwillContext; import org.apache.twill.api.TwillController; @@ -34,9 +36,17 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.io.Reader; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -51,18 +61,16 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest { TwillController controller = twillRunner .prepare(new ServiceApplication()) .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) - .withArguments("r1", "12345") - .withArguments("r2", "45678") + .withApplicationArguments("echo") .start(); - ServiceDiscovered completed = controller.discoverService("completed"); - Assert.assertTrue(waitForSize(completed, 2, 120)); - controller.sendCommand(Command.Builder.of("done").build()); - controller.awaitTerminated(120, TimeUnit.SECONDS); + ServiceDiscovered discovered = controller.discoverService("discovered"); + Assert.assertTrue(waitForSize(discovered, 1, 120)); + controller.terminate().get(); } /** - * An application that contains two {@link ServiceRunnable}. + * An application that contains an EchoServer and an EchoClient. */ public static final class ServiceApplication implements TwillApplication { @@ -71,66 +79,63 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest { return TwillSpecification.Builder.with() .setName("ServiceApp") .withRunnable() - .add("r1", new ServiceRunnable()).noLocalFiles() - .add("r2", new ServiceRunnable()).noLocalFiles() + .add("server", new EchoServer()).noLocalFiles() + .add("client", new EchoClient()).noLocalFiles() .anyOrder() .build(); } } /** - * A Runnable that will announce on service and wait for announcement from another instance in the same service. + * A runnable to discover the echo server and issue a call to it. */ - public static final class ServiceRunnable extends AbstractTwillRunnable { + public static final class EchoClient extends AbstractTwillRunnable { - private static final Logger LOG = LoggerFactory.getLogger(ServiceRunnable.class); - private static final String SERVICE_NAME = "service"; - private final CountDownLatch stopLatch = new CountDownLatch(1); + private static final Logger LOG = LoggerFactory.getLogger(EchoClient.class); + + private final CountDownLatch completion = new CountDownLatch(1); @Override public void run() { - final int port = Integer.parseInt(getContext().getArguments()[0]); - Cancellable cancelService = getContext().announce(SERVICE_NAME, port); - - final CountDownLatch discoveredLatch = new CountDownLatch(1); - - ServiceDiscovered serviceDiscovered = getContext().discover(SERVICE_NAME); + final BlockingQueue<Discoverable> discoverables = new LinkedBlockingQueue<>(); + ServiceDiscovered serviceDiscovered = getContext().discover(getContext().getApplicationArguments()[0]); serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() { @Override public void onChange(ServiceDiscovered serviceDiscovered) { - // Try to find a discoverable that is not this instance - for (Discoverable discoverable : serviceDiscovered) { - int discoveredPort = discoverable.getSocketAddress().getPort(); - if (SERVICE_NAME.equals(discoverable.getName()) && discoveredPort != port) { - LOG.info("{}: Service discovered at {}", getContext().getSpecification().getName(), discoveredPort); - discoveredLatch.countDown(); - } - } + Iterables.addAll(discoverables, serviceDiscovered); } }, Threads.SAME_THREAD_EXECUTOR); try { - discoveredLatch.await(); - } catch (InterruptedException e) { - LOG.warn("Interrupted.", e); - } + Discoverable discoverable = discoverables.poll(120, TimeUnit.SECONDS); + // Make a call to the echo server + InetSocketAddress address = discoverable.getSocketAddress(); + Socket socket = new Socket(address.getAddress(), address.getPort()); + String message = "Hello World"; + try (PrintWriter printer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"))) { + printer.println(message); + printer.flush(); - // Announce the "complete" service so that the driver knows this runnable has discovered the other - Cancellable cancelCompleted = getContext().announce("completed", port); - try { - stopLatch.await(); - cancelService.cancel(); - cancelCompleted.cancel(); + try (Reader reader = new InputStreamReader(socket.getInputStream(), "UTF-8")) { + LineReader lineReader = new LineReader(reader); + String line = lineReader.readLine(); + Preconditions.checkState(message.equals(line), "Expected %s, got %s", message, line); + } + } + + Cancellable cancellable = getContext().announce("discovered", 12345); + completion.await(); + cancellable.cancel(); } catch (InterruptedException e) { LOG.warn("Interrupted.", e); + } catch (IOException e) { + LOG.error("Failed to talk to server", e); } } @Override - public void handleCommand(Command command) throws Exception { - if ("done".equals(command.getCommand())) { - stopLatch.countDown(); - } + public void stop() { + completion.countDown(); } } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/900e3829/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java index fd38576..e9e6a99 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java @@ -19,7 +19,6 @@ package org.apache.twill.yarn; import com.google.common.base.Charsets; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import org.apache.twill.api.AbstractTwillRunnable; import org.apache.twill.api.TwillContext; import org.apache.twill.common.Cancellable; @@ -34,6 +33,7 @@ import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.util.ArrayList; import java.util.List; /** @@ -57,10 +57,13 @@ public abstract class SocketServer extends AbstractTwillRunnable { ", id: " + context.getInstanceId() + ", count: " + context.getInstanceCount()); - final List<Cancellable> cancellables = ImmutableList.of( - context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()), - context.announce(context.getArguments()[0], serverSocket.getLocalPort()) - ); + // Announce with service names as specified in app arguments and runnable arguments + final List<Cancellable> cancellables = new ArrayList<>(); + for (String[] args : new String[][] {context.getApplicationArguments(), context.getArguments()}) { + if (args.length > 0) { + cancellables.add(context.announce(args[0], serverSocket.getLocalPort())); + } + } canceller = new Cancellable() { @Override public void cancel() {
