Repository: incubator-eagle Updated Branches: refs/heads/master 188ddf500 -> 8416f64da
[EAGLE-825] Improve jmx collector with built-in metric filter ## Changes * Support wildcard metric name filter * Add built-in important metric name filter for hadoop and hbase ## JIRA https://issues.apache.org/jira/browse/EAGLE-825 Author: Hao Chen <h...@apache.org> Closes #715 from haoch/ImproveJMXCollector. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8416f64d Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8416f64d Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8416f64d Branch: refs/heads/master Commit: 8416f64da7dcba0c48527aa678a5b2ec9a37d4a8 Parents: 188ddf5 Author: Hao Chen <h...@apache.org> Authored: Tue Dec 6 19:47:50 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Tue Dec 6 19:47:50 2016 +0800 ---------------------------------------------------------------------- .../hadoop_jmx_collector/config-sample.json | 46 --------- .../hadoop_jmx_config-sample.json | 70 +++++++++++++ .../hadoop_jmx_collector/hadoop_jmx_kafka.py | 15 ++- .../hbase_jmx_config-sample.json | 94 +++++++++++++++++ .../hadoop_jmx_collector/metric_collector.py | 101 ++++++++++++++++--- 5 files changed, 265 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json deleted file mode 100644 index a0670e1..0000000 --- a/eagle-external/hadoop_jmx_collector/config-sample.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "env": { - "site": "sandbox" - }, - "input": [ - { - "component": "namenode", - "host": "sandbox.hortonworks.com", - "port": "50070", - "https": false - }, - { - "component": "namenode", - "host": "sandbox.hortonworks.com", - "port": "50070", - "https": false - }, - { - "component": "resourcemanager", - "host": "sandbox.hortonworks.com", - "port": "8088", - "https": false - }, - { - "component": "resourcemanager", - "host": "sandbox.hortonworks.com", - "port": "8088", - "https": false - } - ], - "filter": { - "monitoring.group.selected": ["hadoop", "java.lang"] - }, - "output": { - "kafka": { - "default_topic": "nn_jmx_metric_sandbox", - "component_topic_mapping": { - "namenode": "nn_jmx_metric_sandbox", - "resourcemanager": "rm_jmx_metric_sandbox" - }, - "broker_list": [ - "sandbox.hortonworks.com:6667" - ] - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json new file mode 100644 index 0000000..07bdc84 --- /dev/null +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json @@ -0,0 +1,70 @@ +{ + "env": { + "site": "sandbox" + }, + "input": [ + { + "component": "namenode", + "host": "sandbox.hortonworks.com", + "port": "50070", + "https": false + }, + { + "component": "resourcemanager", + "host": "sandbox.hortonworks.com", + "port": "19888", + "https": false + }, + { + "component": "datanode", + "host": "sandbox.hortonworks.com", + "port": "50075", + "https": false + } + ], + "filter": { + "beam_group_filter": ["hadoop","java.lang"], + "metric_name_filter": [ + "hadoop.memory.heapmemoryusage.used", + "hadoop.memory.nonheapmemoryusage.used", + "hadoop.namenode.fsnamesystemstate.capacitytotal", + "hadoop.namenode.dfs.capacityused", + "hadoop.namenode.dfs.capacityremaining", + "hadoop.namenode.dfs.blockstotal", + "hadoop.namenode.dfs.filestotal", + "hadoop.namenode.dfs.underreplicatedblocks", + "hadoop.namenode.dfs.missingblocks", + "hadoop.namenode.dfs.corruptblocks", + "hadoop.namenode.dfs.lastcheckpointtime", + "hadoop.namenode.dfs.transactionssincelastcheckpoint", + "hadoop.namenode.dfs.lastwrittentransactionid", + "hadoop.namenode.dfs.snapshottabledirectories", + "hadoop.namenode.dfs.snapshots", + "hadoop.namenode.rpc.rpcqueuetimeavgtime", + "hadoop.namenode.rpc.rpcprocessingtimeavgtime", + "hadoop.namenode.rpc.numopenconnections", + "hadoop.namenode.rpc.callqueuelength", + + "hadoop.datanode.fsdatasetstate.capacity", + "hadoop.datanode.fsdatasetstate.dfsused", + "hadoop.datanode.datanodeinfo.xceivercount", + "hadoop.datanode.rpc.rpcqueuetimeavgtime", + "hadoop.datanode.rpc.rpcprocessingtimeavgtime", + "hadoop.datanode.rpc.numopenconnections", + "hadoop.datanode.rpc.callqueuelength" + ] + }, + "output": { + "kafka": { + "debug": false, + "default_topic": "hadoop_jmx_metric_sandbox", + "component_topic_mapping": { + "namenode": "nn_jmx_metric_sandbox", + "resourcemanager": "rm_jmx_metric_sandbox" + }, + "broker_list": [ + "sandbox.hortonworks.com:6667" + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py index b42bc81..799d351 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py @@ -17,7 +17,7 @@ # from metric_collector import JmxMetricCollector,JmxMetricListener,Runner -import json +import json, logging, fnmatch class NNSafeModeMetric(JmxMetricListener): def on_metric(self, metric): @@ -28,7 +28,6 @@ class NNSafeModeMetric(JmxMetricListener): metric["value"] = 0 self.collector.collect(metric) - class NNHAMetric(JmxMetricListener): PREFIX = "hadoop.namenode.fsnamesystem" @@ -74,6 +73,15 @@ class JournalTransactionInfoMetric(JmxMetricListener): self.collector.on_bean_kv(self.PREFIX, component, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId) self.collector.on_bean_kv(self.PREFIX, component, "MostRecentCheckpointTxId", MostRecentCheckpointTxId) +class DatanodeFSDatasetState(JmxMetricListener): + def on_metric(self, metric): + if fnmatch.fnmatch(metric["metric"], "hadoop.datanode.fsdatasetstate-*.capacity"): + metric["metric"] = "hadoop.datanode.fsdatasetstate.capacity" + self.collector.collect(metric) + elif fnmatch.fnmatch(metric["metric"], "hadoop.datanode.fsdatasetstate-*.dfsused"): + metric["metric"] = "hadoop.datanode.fsdatasetstate.dfsused" + self.collector.collect(metric) + if __name__ == '__main__': collector = JmxMetricCollector() collector.register( @@ -81,6 +89,7 @@ if __name__ == '__main__': NNHAMetric(), MemoryUsageMetric(), NNCapacityUsageMetric(), - JournalTransactionInfoMetric() + JournalTransactionInfoMetric(), + DatanodeFSDatasetState() ) Runner.run(collector) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json new file mode 100644 index 0000000..906f134 --- /dev/null +++ b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json @@ -0,0 +1,94 @@ +{ + "env": { + "site": "sandbox" + }, + "input": [ + { + "component": "hbasemaster", + "host": "sandbox.hortonworks.com", + "port": "60010", + "https": false + }, + { + "component": "regionserver", + "host": "sandbox.hortonworks.com", + "port": "60030", + "https": false + } + ], + "filter": { + "beam_group_filter": ["hadoop","java.lang","java.nio"], + "metric_name_filter": [ + "hadoop.memory.heapmemoryusage.used", + "hadoop.memory.nonheapmemoryusage.used", + "hadoop.bufferpool.direct.memoryused", + "hadoop.hbase.master.server.averageload", + "hadoop.hbase.master.assignmentmanger.ritcount", + "hadoop.hbase.master.assignmentmanger.ritcountoverthreshold", + "hadoop.hbase.master.assignmentmanger.assign_num_ops", + "hadoop.hbase.master.assignmentmanger.assign_min", + "hadoop.hbase.master.assignmentmanger.assign_max", + "hadoop.hbase.master.assignmentmanger.assign_75th_percentile", + "hadoop.hbase.master.assignmentmanger.assign_95th_percentile", + "hadoop.hbase.master.assignmentmanger.assign_99th_percentile", + "hadoop.hbase.master.assignmentmanger.bulkassign_num_ops", + "hadoop.hbase.master.assignmentmanger.bulkassign_min", + "hadoop.hbase.master.assignmentmanger.bulkassign_max", + "hadoop.hbase.master.assignmentmanger.bulkassign_75th_percentile", + "hadoop.hbase.master.assignmentmanger.bulkassign_95th_percentile", + "hadoop.hbase.master.assignmentmanger.bulkassign_99th_percentile", + "hadoop.hbase.master.balancer.balancercluster_num_ops", + "hadoop.hbase.master.balancer.balancercluster_min", + "hadoop.hbase.master.balancer.balancercluster_max", + "hadoop.hbase.master.balancer.balancercluster_75th_percentile", + "hadoop.hbase.master.balancer.balancercluster_95th_percentile", + "hadoop.hbase.master.balancer.balancercluster_99th_percentile", + "hadoop.hbase.master.filesystem.hlogsplittime_min", + "hadoop.hbase.master.filesystem.hlogsplittime_max", + "hadoop.hbase.master.filesystem.hlogsplittime_75th_percentile", + "hadoop.hbase.master.filesystem.hlogsplittime_95th_percentile", + "hadoop.hbase.master.filesystem.hlogsplittime_99th_percentile", + "hadoop.hbase.master.filesystem.hlogsplitsize_min", + "hadoop.hbase.master.filesystem.hlogsplitsize_max", + "hadoop.hbase.master.filesystem.metahlogsplittime_min", + "hadoop.hbase.master.filesystem.metahlogsplittime_max", + "hadoop.hbase.master.filesystem.metahlogsplittime_75th_percentile", + "hadoop.hbase.master.filesystem.metahlogsplittime_95th_percentile", + "hadoop.hbase.master.filesystem.metahlogsplittime_99th_percentile", + "hadoop.hbase.master.filesystem.metahlogsplitsize_min", + "hadoop.hbase.master.filesystem.metahlogsplitsize_max", + + "hadoop.hbase.jvm.gccount", + "hadoop.hbase.jvm.gctimemillis", + "hadoop.hbase.ipc.ipc.queuesize", + "hadoop.hbase.ipc.ipc.numcallsingeneralqueue", + "hadoop.hbase.ipc.ipc.numactivehandler", + "hadoop.hbase.ipc.ipc.queuecalltime_99th_percentile", + "hadoop.hbase.ipc.ipc.processcalltime_99th_percentile", + "hadoop.hbase.ipc.ipc.queuecalltime_num_ops", + "hadoop.hbase.ipc.ipc.processcalltime_num_ops", + "hadoop.hbase.regionserver.server.regioncount", + "hadoop.hbase.regionserver.server.storecount", + "hadoop.hbase.regionserver.server.memstoresize", + "hadoop.hbase.regionserver.server.storefilesize", + "hadoop.hbase.regionserver.server.totalrequestcount", + "hadoop.hbase.regionserver.server.readrequestcount", + "hadoop.hbase.regionserver.server.writerequestcount", + "hadoop.hbase.regionserver.server.splitqueuelength", + "hadoop.hbase.regionserver.server.compactionqueuelength", + "hadoop.hbase.regionserver.server.flushqueuelength", + "hadoop.hbase.regionserver.server.blockcachesize", + "hadoop.hbase.regionserver.server.blockcachehitcount", + "hadoop.hbase.regionserver.server.blockcounthitpercent" + ] + }, + "output": { + "kafka": { + "debug": false, + "default_topic": "hadoop_jmx_metric_sandbox", + "broker_list": [ + "sandbox.hortonworks.com:6667" + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index c6eac35..4b7b209 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -27,6 +27,7 @@ import types import httplib import logging import threading +import fnmatch # load six sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six')) @@ -40,6 +41,7 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s', datefmt='%m-%d %H:%M') + class Helper: def __init__(self): pass @@ -213,6 +215,11 @@ class KafkaMetricSender(MetricSender): self.broker_list = kafka_config["broker_list"] self.kafka_client = None self.kafka_producer = None + self.debug_enabled = False + self.sent_count = 0 + if kafka_config.has_key("debug"): + self.debug_enabled = bool(kafka_config["debug"]) + logging.info("Overrode output.kafka.debug: " + str(self.debug_enabled)) def get_topic_id(self, msg): if msg.has_key("component"): @@ -232,16 +239,23 @@ class KafkaMetricSender(MetricSender): batch_send_every_t=30) def send(self, msg): + if self.debug_enabled: + logging.info("Send message: " + str(msg)) + self.sent_count += 1 self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg)) def close(self): + logging.info("Totally sent " + str(self.sent_count) + " metric events") if self.kafka_producer is not None: self.kafka_producer.stop() if self.kafka_client is not None: self.kafka_client.close() class MetricCollector(threading.Thread): - def __init__(self,config = None): + filters = [] + config = None + + def __init__(self, config=None): threading.Thread.__init__(self) self.config = None self.sender = None @@ -251,7 +265,19 @@ class MetricCollector(threading.Thread): self.config = config self.sender = KafkaMetricSender(self.config) self.sender.open() - pass + self.filter(MetricNameFilter()) + + for filter in self.filters: + filter.init(self.config) + + def filter(self, *filters): + """ + :param filters: MetricFilters to register + :return: None + """ + logging.debug("Register filters: " + str(filters)) + for filter in filters: + self.filters.append(filter) def start(self): super(MetricCollector, self).start() @@ -265,7 +291,15 @@ class MetricCollector(threading.Thread): raise Exception("host is null: " + str(msg)) if not msg.has_key("site"): msg["site"] = self.config["env"]["site"] - self.sender.send(msg) + if len(self.filters) == 0: + self.sender.send(msg) + return + else: + for filter in self.filters: + if filter.filter_metric(msg): + self.sender.send(msg) + return + # logging.info("Drop metric: " + str(msg)) def close(self): self.sender.close() @@ -273,6 +307,7 @@ class MetricCollector(threading.Thread): def run(self): raise Exception("`run` method should be overrode by sub-class before being called") + class Runner(object): @staticmethod def run(*collectors): @@ -288,7 +323,7 @@ class Runner(object): elif len(argv) == 2: config = Helper.load_config(argv[1]) else: - raise Exception("Usage: "+argv[0]+" CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) + raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) for collector in collectors: try: @@ -305,10 +340,12 @@ class Runner(object): finally: collector.close() + class JmxMetricCollector(MetricCollector): selected_domain = None listeners = [] input_components = [] + metric_prefix = "hadoop." def init(self, config): super(JmxMetricCollector, self).init(config) @@ -322,8 +359,10 @@ class JmxMetricCollector(MetricCollector): raise Exception("port not defined in " + str(input)) if not input.has_key("https"): input["https"] = False - - self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')] + self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('beam_group_filter')] + if config["env"].has_key("metric_prefix"): + self.metric_prefix = config["env"]["metric_prefix"] + logging.info("Override env.metric_prefix: " + self.metric_prefix + ", default: hadoop.") def register(self, *listeners): """ @@ -347,7 +386,7 @@ class JmxMetricCollector(MetricCollector): def on_beans(self, source, beans): for bean in beans: - self.on_bean(source,bean) + self.on_bean(source, bean) def on_bean(self, source, bean): # mbean is of the form "domain:key=value,...,foo=bar" @@ -357,7 +396,6 @@ class JmxMetricCollector(MetricCollector): if not self.filter_bean(bean, mbean_domain): return - context = bean.get("tag.Context", "") metric_prefix_name = self.__build_metric_prefix(mbean_attribute, context) @@ -367,7 +405,7 @@ class JmxMetricCollector(MetricCollector): for listener in self.listeners: listener.on_bean(source, bean.copy()) - def on_bean_kv(self, prefix,source, key, value): + def on_bean_kv(self, prefix, source, key, value): # Skip Tags if re.match(r'tag.*', key): return @@ -382,7 +420,9 @@ class JmxMetricCollector(MetricCollector): def on_metric(self, metric): if Helper.is_number(metric["value"]): self.collect(metric) - + elif isinstance(metric["value"], dict): + for key, value in metric["value"].iteritems(): + self.on_bean_kv(metric["metric"], metric, key, value) for listener in self.listeners: listener.on_metric(metric.copy()) @@ -394,14 +434,51 @@ class JmxMetricCollector(MetricCollector): name_index = [i[0] for i in mbean_list].index('name') mbean_list[name_index][1] = context metric_prefix_name = '.'.join([i[1] for i in mbean_list]) - return ("hadoop." + metric_prefix_name).replace(" ", "").lower() + return (self.metric_prefix + metric_prefix_name).replace(" ", "").lower() +# ======================== +# Metric Listeners +# ======================== class JmxMetricListener: def init(self, collector): self.collector = collector + self.metric_prefix = self.collector.metric_prefix def on_bean(self, component, bean): pass def on_metric(self, metric): - pass \ No newline at end of file + pass + + +# ======================== +# Metric Filters +# ======================== +class MetricFilter: + def init(self, config={}): + raise Exception("init() method is called before being overrode in sub-class") + pass + + def filter_metric(self, metric): + """ + Filter metric to keep by return True, otherwise throw metric by returning False. + """ + return True + +class MetricNameFilter(MetricFilter): + metric_name_filter = [] + + def init(self, config={}): + if config.has_key("filter") and config["filter"].has_key("metric_name_filter"): + self.metric_name_filter = config["filter"]["metric_name_filter"] + + logging.debug("Override filter.metric_name_filter: " + str(self.metric_name_filter)) + + def filter_metric(self, metric): + if len(self.metric_name_filter) == 0: + return True + else: + for name_filter in self.metric_name_filter: + if fnmatch.fnmatch(metric["metric"], name_filter): + return True + return False \ No newline at end of file