Revert "FLUME-1482 Flume should support exposing metrics via HTTP in JSON/some other web service format."
(Hari Shreedharan via Mubarak Seyed) This reverts commit d656fae3f492b40281b875ba36793a8e488b5e8d. Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8d2755a5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8d2755a5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8d2755a5 Branch: refs/heads/cdh-1.2.0+24_intuit Commit: 8d2755a5c271f835c82c17136f4dad637ad35878 Parents: 68def98 Author: Mubarak Seyed <[email protected]> Authored: Tue Aug 14 01:08:46 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:04 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 1 - flume-ng-core/pom.xml | 5 - .../org/apache/flume/channel/MemoryChannel.java | 2 - .../flume/channel/PseudoTxnMemoryChannel.java | 32 +------ .../flume/instrumentation/ChannelCounter.java | 14 +--- .../flume/instrumentation/ChannelCounterMBean.java | 11 --- .../flume/instrumentation/GangliaServer.java | 70 ++++++++++----- .../instrumentation/MonitoredCounterGroup.java | 4 - .../flume/instrumentation/MonitoringType.java | 3 +- .../flume/instrumentation/SinkCounterMBean.java | 9 -- .../flume/instrumentation/SourceCounterMBean.java | 10 +-- flume-ng-doc/sphinx/FlumeUserGuide.rst | 50 ---------- pom.xml | 7 -- 13 files changed, 53 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index e7735e8..bd2558a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -239,7 +239,6 @@ public class FileChannel extends BasicChannelSemantics { if (open) { channelCounter.start(); channelCounter.setChannelSize(getDepth()); - channelCounter.setChannelCapacity(capacity); } super.start(); } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index f3b3240..5ed5461 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -165,11 +165,6 @@ limitations under the License. </dependency> <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </dependency> - - <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index c72e97c..65b0166 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -247,8 +247,6 @@ public class MemoryChannel extends BasicChannelSemantics { public synchronized void start() { channelCounter.start(); channelCounter.setChannelSize(queue.size()); - channelCounter.setChannelCapacity(Long.valueOf( - queue.size() + queue.remainingCapacity())); super.start(); } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java index cc391c4..489d3e5 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java @@ -28,7 +28,6 @@ import org.apache.flume.Event; import org.apache.flume.Transaction; import com.google.common.base.Preconditions; -import org.apache.flume.instrumentation.ChannelCounter; /** * <p> @@ -84,7 +83,6 @@ public class PseudoTxnMemoryChannel extends AbstractChannel { private BlockingQueue<Event> queue; private Integer keepAlive; - private ChannelCounter channelCounter; @Override public void configure(Context context) { @@ -100,51 +98,27 @@ public class PseudoTxnMemoryChannel extends AbstractChannel { } queue = new ArrayBlockingQueue<Event>(capacity); - if(channelCounter == null) { - channelCounter = new ChannelCounter(getName()); - } - } - - @Override - public void start(){ - channelCounter.start(); - channelCounter.setChannelSize(queue.size()); - channelCounter.setChannelSize( - Long.valueOf(queue.size() + queue.remainingCapacity())); - super.start(); - } - - @Override - public void stop(){ - channelCounter.setChannelSize(queue.size()); - channelCounter.stop(); - super.stop(); } @Override public void put(Event event) { Preconditions.checkState(queue != null, "No queue defined (Did you forget to configure me?"); - channelCounter.incrementEventPutAttemptCount(); + try { queue.put(event); } catch (InterruptedException ex) { throw new ChannelException("Failed to put(" + event + ")", ex); } - channelCounter.addToEventPutSuccessCount(1); - channelCounter.setChannelSize(queue.size()); } @Override public Event take() { Preconditions.checkState(queue != null, "No queue defined (Did you forget to configure me?"); - channelCounter.incrementEventTakeAttemptCount(); + try { - Event e = queue.poll(keepAlive, TimeUnit.SECONDS); - channelCounter.addToEventTakeSuccessCount(1); - channelCounter.setChannelSize(queue.size()); - return e; + return queue.poll(keepAlive, TimeUnit.SECONDS); } catch (InterruptedException ex) { throw new ChannelException("Failed to take()", ex); } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java index 602481e..316384a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java @@ -35,13 +35,10 @@ public class ChannelCounter extends MonitoredCounterGroup implements private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success"; - private static final String COUNTER_CHANNEL_CAPACITY = - "channel.capacity"; - private static final String[] ATTRIBUTES = { COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT, COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS, - COUNTER_EVENT_TAKE_SUCCESS, COUNTER_CHANNEL_CAPACITY + COUNTER_EVENT_TAKE_SUCCESS }; public ChannelCounter(String name) { @@ -92,13 +89,4 @@ public class ChannelCounter extends MonitoredCounterGroup implements public long addToEventTakeSuccessCount(long delta) { return addAndGet(COUNTER_EVENT_TAKE_SUCCESS, delta); } - - public void setChannelCapacity(long capacity){ - set(COUNTER_CHANNEL_CAPACITY, capacity); - } - - public long getChannelCapacity(){ - return get(COUNTER_CHANNEL_CAPACITY); - } - } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java index f0c3ef3..799dd5d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java @@ -18,13 +18,6 @@ */ package org.apache.flume.instrumentation; -/** - * This interface represents a channel counter mbean. Any class implementing - * this interface must sub-class - * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This - * interface might change between minor releases. Please see - * {@linkplain org.apache.flume.instrumentation.ChannelCounter} class. - */ public interface ChannelCounterMBean { long getChannelSize(); @@ -40,8 +33,4 @@ public interface ChannelCounterMBean { long getStartTime(); long getStopTime(); - - long getChannelCapacity(); - - String getType(); } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java index 8d34fee..d93cd33 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java @@ -28,7 +28,6 @@ import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,7 +41,6 @@ import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.api.HostInfo; import org.apache.flume.conf.ConfigurationException; -import org.apache.flume.instrumentation.util.JMXPollUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,16 +49,19 @@ import org.slf4j.LoggerFactory; * once every 60 seconds). This implementation can send data to ganglia 3 and * ganglia 3.1. <p> * - * <b>Mandatory Parameters:</b><p> <tt>hosts: </tt> List of comma separated - * hostname:ports of ganglia servers to report metrics to. <p> <b>Optional - * Parameters: </b><p> <tt>pollFrequency:</tt>Interval in seconds between - * consecutive reports to ganglia servers. Default = 60 seconds.<p> + * <b>Mandatory Parameters:</b><p> + * <tt>hosts: </tt> List of comma separated hostname:ports of ganglia + * servers to report metrics to. <p> + * <b>Optional Parameters: </b><p> + * <tt>pollFrequency:</tt>Interval in seconds between consecutive reports to + * ganglia servers. Default = 60 seconds.<p> * <tt>isGanglia3:</tt> Report to ganglia 3 ? Default = false - reports to * ganglia 3.1. * * * */ + public class GangliaServer implements MonitorService { /* * The Ganglia protocol specific stuff: the xdr_* methods @@ -283,13 +284,14 @@ public class GangliaServer implements MonitorService { public void configure(Context context) { this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, 60); String localHosts = context.getString(this.CONF_HOSTS); - if (localHosts == null || localHosts.isEmpty()) { + if(localHosts == null || localHosts.isEmpty()){ throw new ConfigurationException("Hosts list cannot be empty."); } this.hosts = this.getHostsFromString(localHosts); this.isGanglia3 = context.getBoolean(this.CONF_ISGANGLIA3, false); } + private List<HostInfo> getHostsFromString(String hosts) throws FlumeException { List<HostInfo> hostInfoList = new ArrayList<HostInfo>(); @@ -314,7 +316,6 @@ public class GangliaServer implements MonitorService { } return hostInfoList; } - /** * Worker which polls JMX for all mbeans with * {@link javax.management.ObjectName} within the flume namespace: @@ -331,24 +332,47 @@ public class GangliaServer implements MonitorService { @Override public void run() { try { - Map<String, Map<String, String>> metricsMap = - JMXPollUtil.getAllMBeans(); - for (String component : metricsMap.keySet()) { - Map<String, String> attributeMap = metricsMap.get(component); - for (String attribute : attributeMap.keySet()) { - if (isGanglia3) { - server.createGangliaMessage(GANGLIA_CONTEXT + component + "." - + attribute, - attributeMap.get(attribute)); - } else { - server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." - + attribute, - attributeMap.get(attribute)); + Set<ObjectInstance> queryMBeans = null; + try { + queryMBeans = mbeanServer.queryMBeans( + null, null); + } catch (Exception ex) { + logger.error("Could not get Mbeans for monitoring", ex); + Throwables.propagate(ex); + } + for (ObjectInstance obj : queryMBeans) { + try { + if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { + continue; + } + MBeanAttributeInfo[] attrs = mbeanServer. + getMBeanInfo(obj.getObjectName()).getAttributes(); + String strAtts[] = new String[attrs.length]; + for (int i = 0; i < strAtts.length; i++) { + strAtts[i] = attrs[i].getName(); + } + AttributeList attrList = mbeanServer.getAttributes( + obj.getObjectName(), strAtts); + String component = obj.getObjectName().toString().substring( + obj.getObjectName().toString().indexOf('=') + 1); + for (Object attr : attrList) { + Attribute localAttr = (Attribute) attr; + if (isGanglia3) { + server.createGangliaMessage(GANGLIA_CONTEXT + component + "." + + localAttr.getName(), + localAttr.getValue().toString()); + } else { + server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." + + localAttr.getName(), + localAttr.getValue().toString()); + } + server.sendToGangliaNodes(); } - server.sendToGangliaNodes(); + } catch (Exception ex) { + logger.error("Error getting mbean attributes", ex); } } - } catch (Throwable t) { + } catch(Throwable t) { logger.error("Unexpected error", t); } } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java index 6bc31ef..a03d004 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java @@ -139,8 +139,4 @@ public abstract class MonitoredCounterGroup { SINK_PROCESSOR, SINK }; - - public String getType(){ - return type.name(); - } } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java index 443335c..d132995 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java @@ -24,8 +24,7 @@ package org.apache.flume.instrumentation; */ public enum MonitoringType { OTHER(null), - GANGLIA(org.apache.flume.instrumentation.GangliaServer.class), - HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class); + GANGLIA(org.apache.flume.instrumentation.GangliaServer.class); private Class<? extends MonitorService> monitoringClass; http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java index 472a4dd..6905d49 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java @@ -17,13 +17,6 @@ * under the License. */ package org.apache.flume.instrumentation; -/** - * This interface represents a sink counter mbean. Any class implementing - * this interface must sub-class - * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This - * interface might change between minor releases. Please see - * {@linkplain org.apache.flume.instrumentation.SinkCounter} class. - */ public interface SinkCounterMBean { @@ -46,6 +39,4 @@ public interface SinkCounterMBean { long getStartTime(); long getStopTime(); - - String getType(); } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java index 792e689..e6612d5 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java @@ -17,13 +17,7 @@ * under the License. */ package org.apache.flume.instrumentation; -/** - * This interface represents a source counter mbean. Any class implementing - * this interface must sub-class - * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This - * interface might change between minor releases. Please see - * {@linkplain org.apache.flume.instrumentation.SourceCounter} class. - */ + public interface SourceCounterMBean { long getEventReceivedCount(); @@ -41,6 +35,4 @@ public interface SourceCounterMBean { long getStartTime(); long getStopTime(); - - String getType(); } http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 94212ec..40e385f 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1842,56 +1842,6 @@ starts with ``org.apache.flume``): ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName); -JSON Reporting --------------- -Flume can also report metrics in a JSON format. To enable reporting in JSON format, Flume hosts -a Web server on a configurable port. Flume reports metrics in the following JSON format: - -.. code-block:: java - - { - "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"}, - "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"} - } - -Here is an example: - -.. code-block:: java - - { - "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085", - "Type":"CHANNEL", - "StopTime":"0", - "EventPutAttemptCount":"468086", - "ChannelSize":"233428", - "StartTime":"1344882233070", - "EventTakeSuccessCount":"458200", - "ChannelCapacity":"600000", - "EventTakeAttemptCount":"458288"}, - "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908", - "Type":"CHANNEL", - "StopTime":"0", - "EventPutAttemptCount":"22948908", - "ChannelSize":"5", - "StartTime":"1344882209413", - "EventTakeSuccessCount":"22948900", - "ChannelCapacity":"100", - "EventTakeAttemptCount":"22948908"} - } - -======================= ======= ===================================================================================== -Property Name Default Description -======================= ======= ===================================================================================== -**type** -- The component type name, has to be ``HTTP`` -port 41414 The port to start the server on. -======================= ======= ===================================================================================== - -We can start Flume with Ganglia support as follows:: - - $ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.monitoring.type=HTTP -Dflume.monitoring.port=34545 - -Metrics will then be available at **http://<hostname>:<port>/metrics** webpage. -Custom components can report metrics as mentioned in the Ganglia section above. Custom Reporting ---------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8d2755a5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d6c813a..7ac32d8 100644 --- a/pom.xml +++ b/pom.xml @@ -704,13 +704,6 @@ limitations under the License. <version>2.5-20110124</version> </dependency> - <!-- Gson: Java to Json conversion --> - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.2.2</version> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
