Updated Branches: refs/heads/flume-1.3.0 11de7fefe -> c6312211e
FLUME-1567: Avro source should expose the number of active connections through JMX (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c6312211 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c6312211 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c6312211 Branch: refs/heads/flume-1.3.0 Commit: c6312211e07501cd59e7e0df6e580f3775b0dde7 Parents: 11de7fe Author: Brock Noland <[email protected]> Authored: Wed Sep 26 13:29:15 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Wed Sep 26 13:29:34 2012 -0500 ---------------------------------------------------------------------- .../flume/instrumentation/SourceCounter.java | 14 +++++++- .../flume/instrumentation/SourceCounterMBean.java | 2 + .../java/org/apache/flume/source/AvroSource.java | 26 +++++++++++++- pom.xml | 6 ++-- 4 files changed, 42 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c6312211/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java index 7d69182..972d2c6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java @@ -35,13 +35,17 @@ public class SourceCounter extends MonitoredCounterGroup implements "src.append-batch.received"; private static final String COUNTER_APPEND_BATCH_ACCEPTED = "src.append-batch.accepted"; + + private static final String COUNTER_OPEN_CONNECTION_COUNT = + "src.open-connection.count"; private static final String[] ATTRIBUTES = { COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED, COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED, - COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED + COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED, + COUNTER_OPEN_CONNECTION_COUNT }; @@ -110,4 +114,12 @@ public class SourceCounter extends MonitoredCounterGroup implements public long incrementAppendBatchAcceptedCount() { return increment(COUNTER_APPEND_BATCH_ACCEPTED); } + + public long getOpenConnectionCount() { + return get(COUNTER_OPEN_CONNECTION_COUNT); + } + + public void setOpenConnectionCount(long openConnectionCount){ + set(COUNTER_OPEN_CONNECTION_COUNT, openConnectionCount); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/c6312211/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java index 792e689..5ccbed4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java @@ -43,4 +43,6 @@ public interface SourceCounterMBean { long getStopTime(); String getType(); + + long getOpenConnectionCount(); } http://git-wip-us.apache.org/repos/asf/flume/blob/c6312211/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index e91af9e..47ccf9f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -19,12 +19,15 @@ package org.apache.flume.source; +import com.google.common.base.Throwables; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.Responder; @@ -116,6 +119,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private SourceCounter sourceCounter; private int maxThreads; + private ScheduledExecutorService connectionCountUpdater; @Override public void configure(Context context) { @@ -147,10 +151,19 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, Executors.newCachedThreadPool(), Executors.newFixedThreadPool(maxThreads))); } - + connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); server.start(); sourceCounter.start(); super.start(); + final NettyServer srv = (NettyServer)server; + connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){ + + @Override + public void run() { + sourceCounter.setOpenConnectionCount( + Long.valueOf(srv.getNumActiveConnections())); + } + }, 0, 60, TimeUnit.SECONDS); logger.info("Avro source {} started.", getName()); } @@ -168,8 +181,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, "for Avro server to stop. Exiting. Exception follows.", e); } sourceCounter.stop(); + connectionCountUpdater.shutdown(); + while(!connectionCountUpdater.isTerminated()){ + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + logger.error("Interrupted while waiting for connection count executor " + + "to terminate", ex); + Throwables.propagate(ex); + } + } super.stop(); - logger.info("Avro source {} stopped. Metrics: {}", getName(), sourceCounter); } http://git-wip-us.apache.org/repos/asf/flume/blob/c6312211/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 327562a..e0ad3b5 100644 --- a/pom.xml +++ b/pom.xml @@ -645,19 +645,19 @@ limitations under the License. <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> - <version>1.7.1</version> + <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-compiler</artifactId> - <version>1.7.1</version> + <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> - <version>1.7.1</version> + <version>1.7.2</version> <exclusions> <exclusion> <groupId>org.mortbay.jetty</groupId>
