Updated Branches: refs/heads/flume-1.4 ff4cc5cb1 -> 7cd07a648
FLUME-2004. Capture JMX metrics for Exec source. (Venkatesh Sivasubramanian via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7cd07a64 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7cd07a64 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7cd07a64 Branch: refs/heads/flume-1.4 Commit: 7cd07a648185852997496ee1c35381a04d4f2ac4 Parents: ff4cc5c Author: Mike Percy <[email protected]> Authored: Thu Apr 18 00:40:46 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Thu Apr 18 00:42:17 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/source/ExecSource.java | 25 ++++-- .../org/apache/flume/source/TestExecSource.java | 58 +++++++++++++++ 2 files changed, 74 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/7cd07a64/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 8e687f2..3c9437d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -31,13 +31,13 @@ import java.util.concurrent.TimeUnit; import org.apache.flume.Channel; import org.apache.flume.Context; -import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.Source; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SourceCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,7 +142,7 @@ Configurable { private String shell; private String command; - private CounterGroup counterGroup; + private SourceCounter sourceCounter; private ExecutorService executor; private Future<?> runnerFuture; private long restartThrottle; @@ -157,9 +157,8 @@ Configurable { logger.info("Exec source starting with command:{}", command); executor = Executors.newSingleThreadExecutor(); - counterGroup = new CounterGroup(); - runner = new ExecRunnable(shell, command, getChannelProcessor(), counterGroup, + runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. @@ -170,6 +169,7 @@ Configurable { * it sets our state to running. We want to make sure the executor is alive * and well first. */ + sourceCounter.start(); super.start(); logger.debug("Exec source started"); @@ -202,10 +202,11 @@ Configurable { } } + sourceCounter.stop(); super.stop(); logger.debug("Exec source with command:{} stopped. Metrics:{}", command, - counterGroup); + sourceCounter); } @Override @@ -231,16 +232,20 @@ Configurable { ExecSourceConfigurationConstants.DEFAULT_CHARSET)); shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null); + + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } } private static class ExecRunnable implements Runnable { public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, - CounterGroup counterGroup, boolean restart, long restartThrottle, + SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, Charset charset) { this.command = command; this.channelProcessor = channelProcessor; - this.counterGroup = counterGroup; + this.sourceCounter = sourceCounter; this.restartThrottle = restartThrottle; this.bufferCount = bufferCount; this.restart = restart; @@ -252,7 +257,7 @@ Configurable { private final String shell; private final String command; private final ChannelProcessor channelProcessor; - private final CounterGroup counterGroup; + private final SourceCounter sourceCounter; private volatile boolean restart; private final long restartThrottle; private final int bufferCount; @@ -286,15 +291,17 @@ Configurable { String line = null; List<Event> eventList = new ArrayList<Event>(); while ((line = reader.readLine()) != null) { - counterGroup.incrementAndGet("exec.lines.read"); + sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount) { channelProcessor.processEventBatch(eventList); + sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); } } if(!eventList.isEmpty()) { channelProcessor.processEventBatch(eventList); + sourceCounter.addToEventAcceptedCount(eventList.size()); } } catch (Exception e) { logger.error("Failed while running command: " + command, e); http://git-wip-us.apache.org/repos/asf/flume/blob/7cd07a64/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java index 3d524f0..77e9a44 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java @@ -23,10 +23,16 @@ package org.apache.flume.source; import static org.junit.Assert.*; import java.io.*; +import java.lang.management.ManagementFactory; import java.nio.charset.Charset; import java.util.*; import java.util.regex.Pattern; +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanServer; +import javax.management.ObjectName; + import org.apache.commons.io.FileUtils; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; @@ -70,6 +76,18 @@ public class TestExecSource { @After public void tearDown() { source.stop(); + + // Remove the MBean registered for Monitoring + ObjectName objName = null; + try { + objName = new ObjectName("org.apache.flume.source" + + ":type=" + source.getName()); + + ManagementFactory.getPlatformMBeanServer().unregisterMBean(objName); + } catch (Exception ex) { + System.out.println("Failed to unregister the monitored counter: " + + objName + ex.getMessage()); + } } @Test @@ -168,6 +186,46 @@ public class TestExecSource { } } + @Test + public void testMonitoredCounterGroup() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { + // mini script + runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done" + , new String[]{"1","2","3","4","5" } ); + + ObjectName objName = null; + + try { + objName = new ObjectName("org.apache.flume.source" + + ":type=" + source.getName()); + + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + String strAtts[] = {"Type", "EventReceivedCount", "EventAcceptedCount"}; + AttributeList attrList = mbeanServer.getAttributes(objName, strAtts); + + Assert.assertNotNull(attrList.get(0)); + Assert.assertEquals("Expected Value: Type", "Type", + ((Attribute) attrList.get(0)).getName()); + Assert.assertEquals("Expected Value: SOURCE", "SOURCE", + ((Attribute) attrList.get(0)).getValue()); + + Assert.assertNotNull(attrList.get(1)); + Assert.assertEquals("Expected Value: EventReceivedCount", "EventReceivedCount", + ((Attribute) attrList.get(1)).getName()); + Assert.assertEquals("Expected Value: 5", "5", + ((Attribute) attrList.get(1)).getValue().toString()); + + Assert.assertNotNull(attrList.get(2)); + Assert.assertEquals("Expected Value: EventAcceptedCount", "EventAcceptedCount", + ((Attribute) attrList.get(2)).getName()); + Assert.assertEquals("Expected Value: 5", "5", + ((Attribute) attrList.get(2)).getValue().toString()); + + } catch (Exception ex) { + System.out.println("Unable to retreive the monitored counter: " + + objName + ex.getMessage()); + } + } private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput) throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
