FLUME-1482 Flume should support exposing metrics via HTTP in JSON/some other 
web service format.

(Hari Shreedharan via Mubarak Seyed)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ce01ec1c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ce01ec1c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ce01ec1c

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: ce01ec1cf63eb0f29a4e53498a144104a08f5d3f
Parents: 8d2755a
Author: Mubarak Seyed <[email protected]>
Authored: Tue Aug 14 01:25:11 2012 -0700
Committer: Mike Percy <[email protected]>
Committed: Fri Sep 7 14:03:04 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |    1 +
 flume-ng-core/pom.xml                              |    5 +
 .../org/apache/flume/channel/MemoryChannel.java    |    2 +
 .../flume/channel/PseudoTxnMemoryChannel.java      |   32 ++++-
 .../flume/instrumentation/ChannelCounter.java      |   14 ++-
 .../flume/instrumentation/ChannelCounterMBean.java |   11 ++
 .../flume/instrumentation/GangliaServer.java       |   70 +++-----
 .../instrumentation/MonitoredCounterGroup.java     |    4 +
 .../flume/instrumentation/MonitoringType.java      |    3 +-
 .../flume/instrumentation/SinkCounterMBean.java    |    9 +
 .../flume/instrumentation/SourceCounterMBean.java  |   10 +-
 .../instrumentation/http/HTTPMetricsServer.java    |  128 ++++++++++++++
 .../flume/instrumentation/util/JMXPollUtil.java    |   84 ++++++++++
 .../http/TestHTTPMetricsServer.java                |  130 +++++++++++++++
 .../flume/instrumentation/util/JMXTestUtils.java   |   38 +++++
 .../instrumentation/util/TestJMXPollUtil.java      |   87 ++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   50 ++++++
 pom.xml                                            |    7 +
 18 files changed, 632 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index bd2558a..e7735e8 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -239,6 +239,7 @@ public class FileChannel extends BasicChannelSemantics {
     if (open) {
       channelCounter.start();
       channelCounter.setChannelSize(getDepth());
+      channelCounter.setChannelCapacity(capacity);
     }
     super.start();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 5ed5461..f3b3240 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -165,6 +165,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
index 65b0166..c72e97c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
@@ -247,6 +247,8 @@ public class MemoryChannel extends BasicChannelSemantics {
   public synchronized void start() {
     channelCounter.start();
     channelCounter.setChannelSize(queue.size());
+    channelCounter.setChannelCapacity(Long.valueOf(
+            queue.size() + queue.remainingCapacity()));
     super.start();
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
 
b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
index 489d3e5..cc391c4 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
@@ -28,6 +28,7 @@ import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 
 import com.google.common.base.Preconditions;
+import org.apache.flume.instrumentation.ChannelCounter;
 
 /**
  * <p>
@@ -83,6 +84,7 @@ public class PseudoTxnMemoryChannel extends AbstractChannel {
 
   private BlockingQueue<Event> queue;
   private Integer keepAlive;
+  private ChannelCounter channelCounter;
 
   @Override
   public void configure(Context context) {
@@ -98,27 +100,51 @@ public class PseudoTxnMemoryChannel extends 
AbstractChannel {
     }
 
     queue = new ArrayBlockingQueue<Event>(capacity);
+    if(channelCounter == null) {
+      channelCounter = new ChannelCounter(getName());
+    }
+  }
+
+  @Override
+  public void start(){
+    channelCounter.start();
+    channelCounter.setChannelSize(queue.size());
+    channelCounter.setChannelSize(
+            Long.valueOf(queue.size() + queue.remainingCapacity()));
+    super.start();
+  }
+
+  @Override
+  public void stop(){
+    channelCounter.setChannelSize(queue.size());
+    channelCounter.stop();
+    super.stop();
   }
 
   @Override
   public void put(Event event) {
     Preconditions.checkState(queue != null,
         "No queue defined (Did you forget to configure me?");
-
+    channelCounter.incrementEventPutAttemptCount();
     try {
       queue.put(event);
     } catch (InterruptedException ex) {
       throw new ChannelException("Failed to put(" + event + ")", ex);
     }
+    channelCounter.addToEventPutSuccessCount(1);
+    channelCounter.setChannelSize(queue.size());
   }
 
   @Override
   public Event take() {
     Preconditions.checkState(queue != null,
         "No queue defined (Did you forget to configure me?");
-
+    channelCounter.incrementEventTakeAttemptCount();
     try {
-      return queue.poll(keepAlive, TimeUnit.SECONDS);
+      Event e = queue.poll(keepAlive, TimeUnit.SECONDS);
+      channelCounter.addToEventTakeSuccessCount(1);
+      channelCounter.setChannelSize(queue.size());
+      return e;
     } catch (InterruptedException ex) {
       throw new ChannelException("Failed to take()", ex);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
index 316384a..602481e 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
@@ -35,10 +35,13 @@ public class ChannelCounter extends MonitoredCounterGroup 
implements
   private static final String COUNTER_EVENT_TAKE_SUCCESS =
       "channel.event.take.success";
 
+  private static final String COUNTER_CHANNEL_CAPACITY =
+          "channel.capacity";
+
   private static final String[] ATTRIBUTES = {
     COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT,
     COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS,
-    COUNTER_EVENT_TAKE_SUCCESS
+    COUNTER_EVENT_TAKE_SUCCESS, COUNTER_CHANNEL_CAPACITY
   };
 
   public ChannelCounter(String name) {
@@ -89,4 +92,13 @@ public class ChannelCounter extends MonitoredCounterGroup 
implements
   public long addToEventTakeSuccessCount(long delta) {
     return addAndGet(COUNTER_EVENT_TAKE_SUCCESS, delta);
   }
+
+  public void setChannelCapacity(long capacity){
+    set(COUNTER_CHANNEL_CAPACITY, capacity);
+  }
+
+  public long getChannelCapacity(){
+    return get(COUNTER_CHANNEL_CAPACITY);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
index 799dd5d..f0c3ef3 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
@@ -18,6 +18,13 @@
  */
 package org.apache.flume.instrumentation;
 
+/**
+ * This interface represents a channel counter mbean. Any class implementing
+ * this interface must sub-class
+ * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This
+ * interface might change between minor releases. Please see
+ * {@linkplain org.apache.flume.instrumentation.ChannelCounter} class.
+ */
 public interface ChannelCounterMBean {
 
   long getChannelSize();
@@ -33,4 +40,8 @@ public interface ChannelCounterMBean {
   long getStartTime();
 
   long getStopTime();
+
+  long getChannelCapacity();
+
+  String getType();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
index d93cd33..8d34fee 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
@@ -28,6 +28,7 @@ import java.net.SocketAddress;
 import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +42,7 @@ import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.api.HostInfo;
 import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.instrumentation.util.JMXPollUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,19 +51,16 @@ import org.slf4j.LoggerFactory;
  * once every 60 seconds). This implementation can send data to ganglia 3 and
  * ganglia 3.1. <p>
  *
- * <b>Mandatory Parameters:</b><p>
- * <tt>hosts: </tt> List of comma separated hostname:ports of ganglia
- * servers to report metrics to. <p>
- * <b>Optional Parameters: </b><p>
- * <tt>pollFrequency:</tt>Interval in seconds between consecutive reports to
- * ganglia servers. Default = 60 seconds.<p>
+ * <b>Mandatory Parameters:</b><p> <tt>hosts: </tt> List of comma separated
+ * hostname:ports of ganglia servers to report metrics to. <p> <b>Optional
+ * Parameters: </b><p> <tt>pollFrequency:</tt>Interval in seconds between
+ * consecutive reports to ganglia servers. Default = 60 seconds.<p>
  * <tt>isGanglia3:</tt> Report to ganglia 3 ? Default = false - reports to
  * ganglia 3.1.
  *
  *
  *
  */
-
 public class GangliaServer implements MonitorService {
   /*
    * The Ganglia protocol specific stuff: the xdr_* methods
@@ -284,14 +283,13 @@ public class GangliaServer implements MonitorService {
   public void configure(Context context) {
     this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, 60);
     String localHosts = context.getString(this.CONF_HOSTS);
-    if(localHosts == null || localHosts.isEmpty()){
+    if (localHosts == null || localHosts.isEmpty()) {
       throw new ConfigurationException("Hosts list cannot be empty.");
     }
     this.hosts = this.getHostsFromString(localHosts);
     this.isGanglia3 = context.getBoolean(this.CONF_ISGANGLIA3, false);
   }
 
-
   private List<HostInfo> getHostsFromString(String hosts)
           throws FlumeException {
     List<HostInfo> hostInfoList = new ArrayList<HostInfo>();
@@ -316,6 +314,7 @@ public class GangliaServer implements MonitorService {
     }
     return hostInfoList;
   }
+
   /**
    * Worker which polls JMX for all mbeans with
    * {@link javax.management.ObjectName} within the flume namespace:
@@ -332,47 +331,24 @@ public class GangliaServer implements MonitorService {
     @Override
     public void run() {
       try {
-        Set<ObjectInstance> queryMBeans = null;
-        try {
-          queryMBeans = mbeanServer.queryMBeans(
-                  null, null);
-        } catch (Exception ex) {
-          logger.error("Could not get Mbeans for monitoring", ex);
-          Throwables.propagate(ex);
-        }
-        for (ObjectInstance obj : queryMBeans) {
-          try {
-            if 
(!obj.getObjectName().toString().startsWith("org.apache.flume")) {
-              continue;
-            }
-            MBeanAttributeInfo[] attrs = mbeanServer.
-                    getMBeanInfo(obj.getObjectName()).getAttributes();
-            String strAtts[] = new String[attrs.length];
-            for (int i = 0; i < strAtts.length; i++) {
-              strAtts[i] = attrs[i].getName();
-            }
-            AttributeList attrList = mbeanServer.getAttributes(
-                    obj.getObjectName(), strAtts);
-            String component = obj.getObjectName().toString().substring(
-                obj.getObjectName().toString().indexOf('=') + 1);
-            for (Object attr : attrList) {
-              Attribute localAttr = (Attribute) attr;
-              if (isGanglia3) {
-                server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
-                        + localAttr.getName(),
-                        localAttr.getValue().toString());
-              } else {
-                server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
-                        + localAttr.getName(),
-                        localAttr.getValue().toString());
-              }
-              server.sendToGangliaNodes();
+        Map<String, Map<String, String>> metricsMap =
+                JMXPollUtil.getAllMBeans();
+        for (String component : metricsMap.keySet()) {
+          Map<String, String> attributeMap = metricsMap.get(component);
+          for (String attribute : attributeMap.keySet()) {
+            if (isGanglia3) {
+              server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
+                      + attribute,
+                      attributeMap.get(attribute));
+            } else {
+              server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
+                      + attribute,
+                      attributeMap.get(attribute));
             }
-          } catch (Exception ex) {
-            logger.error("Error getting mbean attributes", ex);
+            server.sendToGangliaNodes();
           }
         }
-      } catch(Throwable t) {
+      } catch (Throwable t) {
         logger.error("Unexpected error", t);
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
index a03d004..6bc31ef 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
@@ -139,4 +139,8 @@ public abstract class MonitoredCounterGroup {
     SINK_PROCESSOR,
     SINK
   };
+
+  public String getType(){
+    return type.name();
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
index d132995..443335c 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
@@ -24,7 +24,8 @@ package org.apache.flume.instrumentation;
  */
 public enum MonitoringType {
   OTHER(null),
-  GANGLIA(org.apache.flume.instrumentation.GangliaServer.class);
+  GANGLIA(org.apache.flume.instrumentation.GangliaServer.class),
+  HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class);
 
   private Class<? extends MonitorService> monitoringClass;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
index 6905d49..472a4dd 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
@@ -17,6 +17,13 @@
  * under the License.
  */
 package org.apache.flume.instrumentation;
+/**
+ * This interface represents a sink counter mbean. Any class implementing
+ * this interface must sub-class
+ * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This
+ * interface might change between minor releases. Please see
+ * {@linkplain org.apache.flume.instrumentation.SinkCounter} class.
+ */
 
 public interface SinkCounterMBean {
 
@@ -39,4 +46,6 @@ public interface SinkCounterMBean {
   long getStartTime();
 
   long getStopTime();
+
+  String getType();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
index e6612d5..792e689 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
@@ -17,7 +17,13 @@
  * under the License.
  */
 package org.apache.flume.instrumentation;
-
+/**
+ * This interface represents a source counter mbean. Any class implementing
+ * this interface must sub-class
+ * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This
+ * interface might change between minor releases. Please see
+ * {@linkplain org.apache.flume.instrumentation.SourceCounter} class.
+ */
 public interface SourceCounterMBean {
 
   long getEventReceivedCount();
@@ -35,4 +41,6 @@ public interface SourceCounterMBean {
   long getStartTime();
 
   long getStopTime();
+
+  String getType();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
new file mode 100644
index 0000000..373e344
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation.http;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Map;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.flume.Context;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.mortbay.jetty.Request;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.AbstractHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Monitor service implementation that runs a web server on a configurable
+ * port and returns the metrics for components in JSON format. <p> Optional
+ * parameters: <p> <tt>port</tt> : The port on which the server should listen
+ * to.<p> Returns metrics in the following format: <p>
+ *
+ * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
+ * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
+ * <p> }
+ */
+public class HTTPMetricsServer implements MonitorService {
+
+  private Server jettyServer;
+  private int port;
+  private static Logger LOG = LoggerFactory.getLogger(HTTPMetricsServer.class);
+  public static int DEFAULT_PORT = 41414;
+  public static String CONFIG_PORT = "port";
+
+  @Override
+  public void start() {
+    jettyServer = new Server(port);
+    //We can use Contexts etc if we have many urls to handle. For one url,
+    //specifying a handler directly is the most efficient.
+    jettyServer.setHandler(new HTTPMetricsHandler());
+    try {
+      jettyServer.start();
+      while (!jettyServer.isStarted()) {
+        Thread.sleep(500);
+      }
+    } catch (Exception ex) {
+      LOG.error("Error starting Jetty. JSON Metrics may not be available.", 
ex);
+    }
+
+  }
+
+  @Override
+  public void stop() {
+    try {
+      jettyServer.stop();
+      jettyServer.join();
+    } catch (Exception ex) {
+      LOG.error("Error stopping Jetty. JSON Metrics may not be available.", 
ex);
+    }
+
+  }
+
+  @Override
+  public void configure(Context context) {
+    port = context.getInteger(CONFIG_PORT, DEFAULT_PORT);
+  }
+
+  private class HTTPMetricsHandler extends AbstractHandler {
+
+    Type mapType =
+            new TypeToken<Map<String, Map<String, String>>>() {
+            }.getType();
+    Gson gson = new Gson();
+
+    @Override
+    public void handle(String target,
+            HttpServletRequest request,
+            HttpServletResponse response,
+            int dispatch) throws IOException, ServletException {
+      // /metrics is the only place to pull metrics.
+      //If we want to use any other url for something else, we should make sure
+      //that for metrics only /metrics is used to prevent backward
+      //compatibility issues.
+      if (target.equals("/")) {
+        response.setContentType("text/html;charset=utf-8");
+        response.setStatus(HttpServletResponse.SC_OK);
+        response.getWriter().write("For Flume metrics please click"
+                + " <a href = \"./metrics\"> here</a>.");
+        response.flushBuffer();
+        ((Request) request).setHandled(true);
+        return;
+      } else if (target.equalsIgnoreCase("/metrics")) {
+        response.setContentType("application/json;charset=utf-8");
+        response.setStatus(HttpServletResponse.SC_OK);
+        Map<String, Map<String, String>> metricsMap = 
JMXPollUtil.getAllMBeans();
+        String json = gson.toJson(metricsMap, mapType);
+        response.getWriter().write(json);
+        response.flushBuffer();
+        ((Request) request).setHandled(true);
+        return;
+      }
+      response.sendError(HttpServletResponse.SC_NOT_FOUND);
+      response.flushBuffer();
+      //Not handling the request returns a Not found error page.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
new file mode 100644
index 0000000..cbd6c35
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation.util;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.Set;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JMXPollUtil {
+
+  private static Logger LOG = LoggerFactory.getLogger(JMXPollUtil.class);
+  private static MBeanServer mbeanServer = ManagementFactory.
+          getPlatformMBeanServer();
+
+  public static Map<String, Map<String, String>> getAllMBeans() {
+    Map<String, Map<String, String>> mbeanMap = Maps.newHashMap();
+    Set<ObjectInstance> queryMBeans = null;
+    try {
+      queryMBeans = mbeanServer.queryMBeans(null, null);
+    } catch (Exception ex) {
+      LOG.error("Could not get Mbeans for monitoring", ex);
+      Throwables.propagate(ex);
+    }
+    for (ObjectInstance obj : queryMBeans) {
+      try {
+        if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
+          continue;
+        }
+        MBeanAttributeInfo[] attrs = mbeanServer.
+                getMBeanInfo(obj.getObjectName()).getAttributes();
+        String strAtts[] = new String[attrs.length];
+        for (int i = 0; i < strAtts.length; i++) {
+          strAtts[i] = attrs[i].getName();
+        }
+        AttributeList attrList = mbeanServer.getAttributes(
+                obj.getObjectName(), strAtts);
+        String component = obj.getObjectName().toString().substring(
+                obj.getObjectName().toString().indexOf('=') + 1);
+        Map<String, String> attrMap = Maps.newHashMap();
+
+
+        for (Object attr : attrList) {
+          Attribute localAttr = (Attribute) attr;
+          if(localAttr.getName().equalsIgnoreCase("type")){
+            component = localAttr.getValue()+ "." + component;
+          }
+          attrMap.put(localAttr.getName(), localAttr.getValue().toString());
+        }
+        mbeanMap.put(component, attrMap);
+      } catch (Exception e) {
+        LOG.error("Unable to poll JMX for metrics.", e);
+      }
+    }
+    return mbeanMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
new file mode 100644
index 0000000..a2a1c30
--- /dev/null
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation.http;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.util.JMXTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestHTTPMetricsServer {
+
+  Channel memChannel = new MemoryChannel();
+  Channel pmemChannel = new PseudoTxnMemoryChannel();
+  Type mapType =
+          new TypeToken<Map<String, Map<String, String>>>() {
+          }.getType();
+  Gson gson = new Gson();
+
+  @Test
+  public void testJSON() throws Exception {
+    memChannel.setName("memChannel");
+    pmemChannel.setName("pmemChannel");
+    Context c = new Context();
+    Configurables.configure(memChannel, c);
+    Configurables.configure(pmemChannel, c);
+    memChannel.start();
+    pmemChannel.start();
+    Transaction txn = memChannel.getTransaction();
+    txn.begin();
+    memChannel.put(EventBuilder.withBody("blah".getBytes()));
+    memChannel.put(EventBuilder.withBody("blah".getBytes()));
+    txn.commit();
+    txn.close();
+
+    txn = memChannel.getTransaction();
+    txn.begin();
+    memChannel.take();
+    txn.commit();
+    txn.close();
+
+
+    Transaction txn2 = pmemChannel.getTransaction();
+    txn2.begin();
+    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+    txn2.commit();
+    txn2.close();
+
+    txn2 = pmemChannel.getTransaction();
+    txn2.begin();
+    pmemChannel.take();
+    txn2.commit();
+    txn2.close();
+
+    testWithPort(5467);
+    testWithPort(33434);
+    testWithPort(44343);
+    testWithPort(0);
+    memChannel.stop();
+    pmemChannel.stop();
+  }
+
+  private void testWithPort(int port) throws Exception {
+    MonitorService srv = new HTTPMetricsServer();
+    Context context = new Context();
+    if(port > 1024){
+      context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port));
+    } else {
+      port = HTTPMetricsServer.DEFAULT_PORT;
+    }
+    srv.configure(context);
+    srv.start();
+    Thread.sleep(1000);
+    URL url = new URL("http://0.0.0.0:"; + String.valueOf(port) + "/metrics");
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod("GET");
+    BufferedReader reader = new BufferedReader(
+            new InputStreamReader(conn.getInputStream()));
+    String line;
+    String result = "";
+    while ((line = reader.readLine()) != null) {
+      result += line;
+    }
+    reader.close();
+    Map<String, Map<String, String>> mbeans = gson.fromJson(result, mapType);
+    Assert.assertNotNull(mbeans);
+    Map<String, String> memBean = mbeans.get("CHANNEL.memChannel");
+    Assert.assertNotNull(memBean);
+    JMXTestUtils.checkChannelCounterParams(memBean);
+    Map<String, String> pmemBean = mbeans.get("CHANNEL.pmemChannel");
+    Assert.assertNotNull(pmemBean);
+    JMXTestUtils.checkChannelCounterParams(pmemBean);
+    srv.stop();
+    System.out.println(String.valueOf(port) + "test success!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java
 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java
new file mode 100644
index 0000000..a392e0c
--- /dev/null
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation.util;
+
+import java.util.Map;
+import org.junit.Assert;
+
+/**
+ *
+ */
+public class JMXTestUtils {
+
+  public static void checkChannelCounterParams(Map<String, String> attrs) {
+    Assert.assertNotNull(attrs.get("StartTime"));
+    Assert.assertNotNull(attrs.get("StopTime"));
+    Assert.assertTrue(Long.parseLong(attrs.get("ChannelSize")) != 0);
+    Assert.assertTrue(Long.parseLong(attrs.get("EventPutAttemptCount")) == 2);
+    Assert.assertTrue(Long.parseLong(attrs.get("EventTakeAttemptCount")) == 1);
+    Assert.assertTrue(Long.parseLong(attrs.get("EventPutSuccessCount")) == 2);
+    Assert.assertTrue(Long.parseLong(attrs.get("EventTakeSuccessCount")) == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java
 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java
new file mode 100644
index 0000000..71340f5
--- /dev/null
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation.util;
+
+import java.util.Map;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestJMXPollUtil {
+
+  Channel memChannel = new MemoryChannel();
+  Channel pmemChannel = new PseudoTxnMemoryChannel();
+
+  @Test
+  public void testJMXPoll() {
+    memChannel.setName("memChannel");
+    pmemChannel.setName("pmemChannel");
+    Context c = new Context();
+    Configurables.configure(memChannel, c);
+    Configurables.configure(pmemChannel, c);
+    memChannel.start();
+    pmemChannel.start();
+    Transaction txn = memChannel.getTransaction();
+    txn.begin();
+    memChannel.put(EventBuilder.withBody("blah".getBytes()));
+    memChannel.put(EventBuilder.withBody("blah".getBytes()));
+    txn.commit();
+    txn.close();
+
+    txn = memChannel.getTransaction();
+    txn.begin();
+    memChannel.take();
+    txn.commit();
+    txn.close();
+
+
+    Transaction txn2 = pmemChannel.getTransaction();
+    txn2.begin();
+    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+    txn2.commit();
+    txn2.close();
+
+    txn2 = pmemChannel.getTransaction();
+    txn2.begin();
+    pmemChannel.take();
+    txn2.commit();
+    txn2.close();
+
+    Map<String, Map<String, String>> mbeans = JMXPollUtil.getAllMBeans();
+    Assert.assertNotNull(mbeans);
+    Map<String, String> memBean = mbeans.get("CHANNEL.memChannel");
+    Assert.assertNotNull(memBean);
+    JMXTestUtils.checkChannelCounterParams(memBean);
+    Map<String, String> pmemBean = mbeans.get("CHANNEL.pmemChannel");
+    Assert.assertNotNull(pmemBean);
+    JMXTestUtils.checkChannelCounterParams(pmemBean);
+    memChannel.stop();
+    pmemChannel.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 40e385f..94212ec 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1842,6 +1842,56 @@ starts with ``org.apache.flume``):
 
   ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
 
+JSON Reporting
+--------------
+Flume can also report metrics in a JSON format. To enable reporting in JSON 
format, Flume hosts
+a Web server on a configurable port. Flume reports metrics in the following 
JSON format:
+
+.. code-block:: java
+
+  {
+  "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : 
"metricValue2"},
+  "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : 
"metricValue4"}
+  }
+
+Here is an example:
+
+.. code-block:: java
+
+  {
+  "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
+                        "Type":"CHANNEL",
+                        "StopTime":"0",
+                        "EventPutAttemptCount":"468086",
+                        "ChannelSize":"233428",
+                        "StartTime":"1344882233070",
+                        "EventTakeSuccessCount":"458200",
+                        "ChannelCapacity":"600000",
+                        "EventTakeAttemptCount":"458288"},
+  "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
+                     "Type":"CHANNEL",
+                     "StopTime":"0",
+                     "EventPutAttemptCount":"22948908",
+                     "ChannelSize":"5",
+                     "StartTime":"1344882209413",
+                     "EventTakeSuccessCount":"22948900",
+                     "ChannelCapacity":"100",
+                     "EventTakeAttemptCount":"22948908"}
+  }
+
+=======================  =======  
=====================================================================================
+Property Name            Default  Description
+=======================  =======  
=====================================================================================
+**type**                 --       The component type name, has to be ``HTTP``
+port                     41414    The port to start the server on.
+=======================  =======  
=====================================================================================
+
+We can start Flume with Ganglia support as follows::
+
+  $ bin/flume-ng agent --conf-file example.conf --name agent1 
-Dflume.monitoring.type=HTTP -Dflume.monitoring.port=34545
+
+Metrics will then be available at **http://<hostname>:<port>/metrics** webpage.
+Custom components can report metrics as mentioned in the Ganglia section above.
 
 Custom Reporting
 ----------------

http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7ac32d8..d6c813a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -704,6 +704,13 @@ limitations under the License.
         <version>2.5-20110124</version>
       </dependency>
 
+      <!--  Gson: Java to Json conversion -->
+      <dependency>
+        <groupId>com.google.code.gson</groupId>
+        <artifactId>gson</artifactId>
+        <version>2.2.2</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>

Reply via email to