NetcatSource and test fixes
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bd098cee Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bd098cee Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bd098cee Branch: refs/heads/trunk Commit: bd098cee15e867f87f83656fe76a78f2fe61deb3 Parents: 9f52b5f Author: Juhani Connolly <[email protected]> Authored: Wed Jul 11 18:22:42 2012 +0900 Committer: Juhani Connolly <[email protected]> Committed: Wed Jul 11 18:22:42 2012 +0900 ---------------------------------------------------------------------- .../java/org/apache/flume/source/NetcatSource.java | 9 +++--- .../org/apache/flume/source/TestNetcatSource.java | 20 ++++++++++---- 2 files changed, 18 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/bd098cee/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java index 9d28cda..37c09fe 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java @@ -40,6 +40,7 @@ import org.apache.flume.Context; import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; +import org.apache.flume.FlumeException; import org.apache.flume.Source; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurables; @@ -145,8 +146,6 @@ public class NetcatSource extends AbstractSource implements Configurable, logger.info("Source starting"); - super.start(); - counterGroup.incrementAndGet("open.attempts"); handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder() @@ -163,7 +162,7 @@ public class NetcatSource extends AbstractSource implements Configurable, } catch (IOException e) { counterGroup.incrementAndGet("open.errors"); logger.error("Unable to bind to socket. Exception follows.", e); - return; + throw new FlumeException(e); } AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength); @@ -179,14 +178,13 @@ public class NetcatSource extends AbstractSource implements Configurable, acceptThread.start(); logger.debug("Source started"); + super.start(); } @Override public void stop() { logger.info("Source stopping"); - super.stop(); - acceptThreadShouldStop.set(true); if (acceptThread != null) { @@ -238,6 +236,7 @@ public class NetcatSource extends AbstractSource implements Configurable, } logger.debug("Source stopped. Event metrics:{}", counterGroup); + super.stop(); } private static class AcceptHandler implements Runnable { http://git-wip-us.apache.org/repos/asf/flume/blob/bd098cee/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java index c195db7..3c17d3d 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -37,6 +37,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.EventDrivenSource; +import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; @@ -79,15 +80,22 @@ public class TestNetcatSource { EventDeliveryException { ExecutorService executor = Executors.newFixedThreadPool(3); - Context context = new Context(); + boolean bound = false; - /* FIXME: Use a random port for testing. */ - context.put("bind", "0.0.0.0"); - context.put("port", "41414"); + for(int i = 0; i < 100 && !bound; i++) { + try { + Context context = new Context(); + context.put("bind", "0.0.0.0"); + context.put("port", "41414"); - Configurables.configure(source, context); + Configurables.configure(source, context); - source.start(); + source.start(); + bound = true; + } catch (FlumeException e) { + // assume port in use, try another one + } + } Runnable clientRequestRunnable = new Runnable() {
