Author: hshreedharan
Date: Thu Jul 12 05:37:52 2012
New Revision: 1360531
URL: http://svn.apache.org/viewvc?rev=1360531&view=rev
Log:
FLUME-1363. TestNetcatSource should try multiple ports before failing.
(Juhani Connolly via Hari Shreedharan)
Modified:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Modified:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1360531&r1=1360530&r2=1360531&view=diff
==============================================================================
---
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
(original)
+++
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
Thu Jul 12 05:37:52 2012
@@ -40,6 +40,7 @@ import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
import org.apache.flume.Source;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
@@ -145,8 +146,6 @@ public class NetcatSource extends Abstra
logger.info("Source starting");
- super.start();
-
counterGroup.incrementAndGet("open.attempts");
handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
@@ -163,7 +162,7 @@ public class NetcatSource extends Abstra
} catch (IOException e) {
counterGroup.incrementAndGet("open.errors");
logger.error("Unable to bind to socket. Exception follows.", e);
- return;
+ throw new FlumeException(e);
}
AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
@@ -179,14 +178,13 @@ public class NetcatSource extends Abstra
acceptThread.start();
logger.debug("Source started");
+ super.start();
}
@Override
public void stop() {
logger.info("Source stopping");
- super.stop();
-
acceptThreadShouldStop.set(true);
if (acceptThread != null) {
@@ -238,6 +236,7 @@ public class NetcatSource extends Abstra
}
logger.debug("Source stopped. Event metrics:{}", counterGroup);
+ super.stop();
}
private static class AcceptHandler implements Runnable {
Modified:
flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1360531&r1=1360530&r2=1360531&view=diff
==============================================================================
---
flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
(original)
+++
flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Thu Jul 12 05:37:52 2012
@@ -37,6 +37,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
@@ -79,15 +80,22 @@ public class TestNetcatSource {
EventDeliveryException {
ExecutorService executor = Executors.newFixedThreadPool(3);
- Context context = new Context();
-
- /* FIXME: Use a random port for testing. */
- context.put("bind", "0.0.0.0");
- context.put("port", "41414");
-
- Configurables.configure(source, context);
+ boolean bound = false;
- source.start();
+ for(int i = 0; i < 100 && !bound; i++) {
+ try {
+ Context context = new Context();
+ context.put("bind", "0.0.0.0");
+ context.put("port", "41414");
+
+ Configurables.configure(source, context);
+
+ source.start();
+ bound = true;
+ } catch (FlumeException e) {
+ // assume port in use, try another one
+ }
+ }
Runnable clientRequestRunnable = new Runnable() {