CHUKWA-764. Clean up multiple flavor of JSON usage. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/c914d340 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/c914d340 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/c914d340 Branch: refs/heads/master Commit: c914d3401026c8dbb8b93bba4da52841b704f0f5 Parents: 01a72e2 Author: Eric Yang <[email protected]> Authored: Mon Jun 22 17:40:40 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Mon Jun 22 17:40:40 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + pom.xml | 13 +- .../agent/rest/AdaptorController.java | 3 +- .../processor/mapper/DatanodeProcessor.java | 233 +++--- .../processor/mapper/HBaseMasterProcessor.java | 125 ++- .../mapper/HBaseRegionServerProcessor.java | 82 +- .../processor/mapper/JobConfProcessor.java | 2 +- .../processor/mapper/JobTrackerProcessor.java | 241 +++--- .../mapper/Log4JMetricsContextProcessor.java | 17 +- .../processor/mapper/NamenodeProcessor.java | 298 ++++---- .../processor/mapper/ZookeeperProcessor.java | 119 ++- .../chukwa/extraction/hbase/SystemMetrics.java | 6 +- .../apache/hadoop/chukwa/hicc/JSONLoader.java | 21 +- .../org/apache/hadoop/chukwa/hicc/Views.java | 26 +- .../apache/hadoop/chukwa/hicc/Workspace.java | 759 ++++++++++--------- src/site/apt/Quick_Start_Guide.apt | 34 +- .../chukwa/database/TestDatabaseWebJson.java | 1 - 17 files changed, 995 insertions(+), 987 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3fb27b2..2e46c2a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -44,6 +44,8 @@ Trunk (unreleased changes) BUGS + CHUKWA-764. Clean up multiple flavor of JSON usage. (Eric Yang) + CHUKWA-763. Removed unused old libraries. (Eric Yang) CHUKWA-760. Added error message for missing configuration. (Eric Yang) http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6ad2055..80b4013 100644 --- a/pom.xml +++ b/pom.xml @@ -145,9 +145,9 @@ <version>1.6.4</version> </dependency> <dependency> - <groupId>org.json</groupId> - <artifactId>json</artifactId> - <version>20090211</version> + <groupId>com.googlecode.json-simple</groupId> + <artifactId>json-simple</artifactId> + <version>1.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> @@ -281,14 +281,9 @@ </exclusions> </dependency> <dependency> - <groupId>com.googlecode.json-simple</groupId> - <artifactId>json-simple</artifactId> - <version>1.1</version> - </dependency> - <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> - <version>4.7.2</version> + <version>4.9.0</version> <type>jar</type> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java index 7e2fa7c..70edc2a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java @@ -23,8 +23,7 @@ import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager; import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.commons.lang.StringEscapeUtils; -import org.json.JSONObject; -import org.json.JSONException; +import org.json.simple.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java index b5cd941..4e5765d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java @@ -36,125 +36,122 @@ import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; -@Tables(annotations={ -@Table(name="Datanode",columnFamily="dn"), -@Table(name="Datanode",columnFamily="jvm"), -@Table(name="Datanode",columnFamily="rpc") -}) -public class DatanodeProcessor extends AbstractProcessor{ +@Tables(annotations = { @Table(name = "Datanode", columnFamily = "dn"), + @Table(name = "Datanode", columnFamily = "jvm"), + @Table(name = "Datanode", columnFamily = "rpc") }) +public class DatanodeProcessor extends AbstractProcessor { - static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>(); - - static { - long zero = 0L; - rateMap.put("blocks_verified", zero); - rateMap.put("blocks_written", zero); - rateMap.put("blocks_read", zero); - rateMap.put("bytes_written", zero); - rateMap.put("bytes_read", zero); - rateMap.put("heartBeats_num_ops", zero); - rateMap.put("SentBytes", zero); - rateMap.put("ReceivedBytes", zero); - rateMap.put("rpcAuthorizationSuccesses", zero); - rateMap.put("rpcAuthorizationFailures", zero); - rateMap.put("RpcQueueTime_num_ops", zero); - rateMap.put("RpcProcessingTime_num_ops", zero); - rateMap.put("gcCount", zero); - } - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, - Reporter reporter) throws Throwable { - Logger log = Logger.getLogger(DatanodeProcessor.class); - long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - - final ChukwaRecord hdfs_datanode = new ChukwaRecord(); - final ChukwaRecord datanode_jvm = new ChukwaRecord(); - final ChukwaRecord datanode_rpc = new ChukwaRecord(); - - Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){ - private static final long serialVersionUID = 1L; + static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>(); - { - put("blocks_verified", hdfs_datanode); - put("blocks_written", hdfs_datanode); - put("blocks_read", hdfs_datanode); - put("blocks_replicated", hdfs_datanode); - put("blocks_removed", hdfs_datanode); - put("bytes_written", hdfs_datanode); - put("bytes_read", hdfs_datanode); - put("heartBeats_avg_time", hdfs_datanode); - put("heartBeats_num_ops", hdfs_datanode); - - put("gcCount", datanode_jvm); - put("gcTimeMillis", datanode_jvm); - put("logError", datanode_jvm); - put("logFatal", datanode_jvm); - put("logInfo", datanode_jvm); - put("logWarn", datanode_jvm); - put("memHeapCommittedM", datanode_jvm); - put("memHeapUsedM", datanode_jvm); - put("threadsBlocked", datanode_jvm); - put("threadsNew", datanode_jvm); - put("threadsRunnable", datanode_jvm); - put("threadsTerminated", datanode_jvm); - put("threadsTimedWaiting", datanode_jvm); - put("threadsWaiting", datanode_jvm); + static { + long zero = 0L; + rateMap.put("blocks_verified", zero); + rateMap.put("blocks_written", zero); + rateMap.put("blocks_read", zero); + rateMap.put("bytes_written", zero); + rateMap.put("bytes_read", zero); + rateMap.put("heartBeats_num_ops", zero); + rateMap.put("SentBytes", zero); + rateMap.put("ReceivedBytes", zero); + rateMap.put("rpcAuthorizationSuccesses", zero); + rateMap.put("rpcAuthorizationFailures", zero); + rateMap.put("RpcQueueTime_num_ops", zero); + rateMap.put("RpcProcessingTime_num_ops", zero); + rateMap.put("gcCount", zero); + } - put("ReceivedBytes", datanode_rpc); - put("RpcProcessingTime_avg_time", datanode_rpc); - put("RpcProcessingTime_num_ops", datanode_rpc); - put("RpcQueueTime_avg_time", datanode_rpc); - put("RpcQueueTime_num_ops", datanode_rpc); - put("SentBytes", datanode_rpc); - put("rpcAuthorizationSuccesses", datanode_rpc); - } - }; - try{ - JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); - String ttTag = chunk.getTag("timeStamp"); - if(ttTag == null){ - log.warn("timeStamp tag not set in JMX adaptor for datanode"); - } - else{ - timeStamp = Long.parseLong(ttTag); - } - Iterator<JSONObject> iter = obj.entrySet().iterator(); - - while(iter.hasNext()){ - Map.Entry entry = (Map.Entry)iter.next(); - String key = (String) entry.getKey(); - Object value = entry.getValue(); - String valueString = value == null?"":value.toString(); - - //Calculate rate for some of the metrics - if(rateMap.containsKey(key)){ - long oldValue = rateMap.get(key); - long curValue = Long.parseLong(valueString); - rateMap.put(key, curValue); - long newValue = curValue - oldValue; - if(newValue < 0){ - log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "+key); - newValue = 0L; - } - valueString = Long.toString(newValue); - } - - if(metricsMap.containsKey(key)){ - ChukwaRecord rec = metricsMap.get(key); - rec.add(key, valueString); - } - } - buildGenericRecord(hdfs_datanode, null, timeStamp, "dn"); - output.collect(key, hdfs_datanode); - buildGenericRecord(datanode_jvm, null, timeStamp, "jvm"); - output.collect(key, datanode_jvm); - buildGenericRecord(datanode_rpc, null, timeStamp, "rpc"); - output.collect(key, datanode_rpc); - } - catch(Exception e){ - log.error(ExceptionUtil.getStackTrace(e)); - } - } + @Override + protected void parse(String recordEntry, + OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) + throws Throwable { + Logger log = Logger.getLogger(DatanodeProcessor.class); + long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + .getTimeInMillis(); + + final ChukwaRecord hdfs_datanode = new ChukwaRecord(); + final ChukwaRecord datanode_jvm = new ChukwaRecord(); + final ChukwaRecord datanode_rpc = new ChukwaRecord(); + + Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() { + private static final long serialVersionUID = 1L; + + { + put("blocks_verified", hdfs_datanode); + put("blocks_written", hdfs_datanode); + put("blocks_read", hdfs_datanode); + put("blocks_replicated", hdfs_datanode); + put("blocks_removed", hdfs_datanode); + put("bytes_written", hdfs_datanode); + put("bytes_read", hdfs_datanode); + put("heartBeats_avg_time", hdfs_datanode); + put("heartBeats_num_ops", hdfs_datanode); + + put("gcCount", datanode_jvm); + put("gcTimeMillis", datanode_jvm); + put("logError", datanode_jvm); + put("logFatal", datanode_jvm); + put("logInfo", datanode_jvm); + put("logWarn", datanode_jvm); + put("memHeapCommittedM", datanode_jvm); + put("memHeapUsedM", datanode_jvm); + put("threadsBlocked", datanode_jvm); + put("threadsNew", datanode_jvm); + put("threadsRunnable", datanode_jvm); + put("threadsTerminated", datanode_jvm); + put("threadsTimedWaiting", datanode_jvm); + put("threadsWaiting", datanode_jvm); + + put("ReceivedBytes", datanode_rpc); + put("RpcProcessingTime_avg_time", datanode_rpc); + put("RpcProcessingTime_num_ops", datanode_rpc); + put("RpcQueueTime_avg_time", datanode_rpc); + put("RpcQueueTime_num_ops", datanode_rpc); + put("SentBytes", datanode_rpc); + put("rpcAuthorizationSuccesses", datanode_rpc); + } + }; + try { + JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); + String ttTag = chunk.getTag("timeStamp"); + if (ttTag == null) { + log.warn("timeStamp tag not set in JMX adaptor for datanode"); + } else { + timeStamp = Long.parseLong(ttTag); + } + Iterator<String> keys = obj.keySet().iterator(); + + while (keys.hasNext()) { + String key = keys.next(); + Object value = obj.get(key); + String valueString = value == null ? "" : value.toString(); + + // Calculate rate for some of the metrics + if (rateMap.containsKey(key)) { + long oldValue = rateMap.get(key); + long curValue = Long.parseLong(valueString); + rateMap.put(key, curValue); + long newValue = curValue - oldValue; + if (newValue < 0) { + log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric " + + key); + newValue = 0L; + } + valueString = Long.toString(newValue); + } + + if (metricsMap.containsKey(key)) { + ChukwaRecord rec = metricsMap.get(key); + rec.add(key, valueString); + } + } + buildGenericRecord(hdfs_datanode, null, timeStamp, "dn"); + output.collect(key, hdfs_datanode); + buildGenericRecord(datanode_jvm, null, timeStamp, "jvm"); + output.collect(key, datanode_jvm); + buildGenericRecord(datanode_rpc, null, timeStamp, "rpc"); + output.collect(key, datanode_rpc); + } catch (Exception e) { + log.error(ExceptionUtil.getStackTrace(e)); + } + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java index 979103e..c04e752 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java @@ -38,69 +38,66 @@ import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; -@Tables(annotations={ -@Table(name="HBase",columnFamily="master") -}) -public class HBaseMasterProcessor extends AbstractProcessor{ - static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>(); - static { - long zero = 0L; - rateMap.put("splitSizeNumOps", zero); - rateMap.put("splitTimeNumOps", zero); - } - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, - Reporter reporter) throws Throwable { - - Logger log = Logger.getLogger(HBaseMasterProcessor.class); - long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - ChukwaRecord record = new ChukwaRecord(); - - Map<String, Buffer> metricsMap = new HashMap<String,Buffer>(); +@Tables(annotations = { @Table(name = "HBase", columnFamily = "master") }) +public class HBaseMasterProcessor extends AbstractProcessor { + static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>(); + static { + long zero = 0L; + rateMap.put("splitSizeNumOps", zero); + rateMap.put("splitTimeNumOps", zero); + } - try{ - JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); - String ttTag = chunk.getTag("timeStamp"); - if(ttTag == null){ - log.warn("timeStamp tag not set in JMX adaptor for hbase master"); - } - else{ - timeStamp = Long.parseLong(ttTag); - } - Iterator<JSONObject> iter = obj.entrySet().iterator(); - - while(iter.hasNext()){ - Map.Entry entry = (Map.Entry)iter.next(); - String key = (String) entry.getKey(); - Object value = entry.getValue(); - String valueString = value == null?"":value.toString(); - - //Calculate rate for some of the metrics - if(rateMap.containsKey(key)){ - long oldValue = rateMap.get(key); - long curValue = Long.parseLong(valueString); - rateMap.put(key, curValue); - long newValue = curValue - oldValue; - if(newValue < 0){ - log.warn("HBaseMaster rateMap might be reset or corrupted for metric "+key); - newValue = 0L; - } - valueString = Long.toString(newValue); - } - - Buffer b = new Buffer(valueString.getBytes()); - metricsMap.put(key,b); - } - - TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap); - record.setMapFields(t); - buildGenericRecord(record, null, timeStamp, "master"); - output.collect(key, record); - } - catch(Exception e){ - log.error(ExceptionUtil.getStackTrace(e)); - } - } + @Override + protected void parse(String recordEntry, + OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) + throws Throwable { + + Logger log = Logger.getLogger(HBaseMasterProcessor.class); + long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + .getTimeInMillis(); + ChukwaRecord record = new ChukwaRecord(); + + Map<String, Buffer> metricsMap = new HashMap<String, Buffer>(); + + try { + JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); + String ttTag = chunk.getTag("timeStamp"); + if (ttTag == null) { + log.warn("timeStamp tag not set in JMX adaptor for hbase master"); + } else { + timeStamp = Long.parseLong(ttTag); + } + Iterator<String> keys = obj.keySet().iterator(); + + while (keys.hasNext()) { + String key = keys.next(); + Object value = obj.get(key); + String valueString = value == null ? "" : value.toString(); + + // Calculate rate for some of the metrics + if (rateMap.containsKey(key)) { + long oldValue = rateMap.get(key); + long curValue = Long.parseLong(valueString); + rateMap.put(key, curValue); + long newValue = curValue - oldValue; + if (newValue < 0) { + log.warn("HBaseMaster rateMap might be reset or corrupted for metric " + + key); + newValue = 0L; + } + valueString = Long.toString(newValue); + } + + Buffer b = new Buffer(valueString.getBytes()); + metricsMap.put(key, b); + } + + TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap); + record.setMapFields(t); + buildGenericRecord(record, null, timeStamp, "master"); + output.collect(key, record); + } catch (Exception e) { + log.error(ExceptionUtil.getStackTrace(e)); + } + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java index be28df5..8fab057 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java @@ -37,49 +37,45 @@ import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; -@Tables(annotations={ -@Table(name="HBase",columnFamily="regionserver") -}) -public class HBaseRegionServerProcessor extends AbstractProcessor{ +@Tables(annotations = { @Table(name = "HBase", columnFamily = "regionserver") }) +public class HBaseRegionServerProcessor extends AbstractProcessor { - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, - Reporter reporter) throws Throwable { - - Logger log = Logger.getLogger(HBaseRegionServerProcessor.class); - long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - ChukwaRecord record = new ChukwaRecord(); - - Map<String, Buffer> metricsMap = new HashMap<String,Buffer>(); + @Override + protected void parse(String recordEntry, + OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) + throws Throwable { - try{ - JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); - String ttTag = chunk.getTag("timeStamp"); - if(ttTag == null){ - log.warn("timeStamp tag not set in JMX adaptor for hbase region server"); - } - else{ - timeStamp = Long.parseLong(ttTag); - } - Iterator<JSONObject> iter = obj.entrySet().iterator(); - - while(iter.hasNext()){ - Map.Entry entry = (Map.Entry)iter.next(); - String key = (String) entry.getKey(); - Object value = entry.getValue(); - String valueString = value == null?"":value.toString(); - Buffer b = new Buffer(valueString.getBytes()); - metricsMap.put(key,b); - } - - TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap); - record.setMapFields(t); - buildGenericRecord(record, null, timeStamp, "regionserver"); - output.collect(key, record); - } - catch(Exception e){ - log.error(ExceptionUtil.getStackTrace(e)); - } - } + Logger log = Logger.getLogger(HBaseRegionServerProcessor.class); + long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + .getTimeInMillis(); + ChukwaRecord record = new ChukwaRecord(); + + Map<String, Buffer> metricsMap = new HashMap<String, Buffer>(); + + try { + JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); + String ttTag = chunk.getTag("timeStamp"); + if (ttTag == null) { + log.warn("timeStamp tag not set in JMX adaptor for hbase region server"); + } else { + timeStamp = Long.parseLong(ttTag); + } + Iterator<String> keys = obj.keySet().iterator(); + + while (keys.hasNext()) { + String key = keys.next(); + Object value = obj.get(key); + String valueString = value == null ? "" : value.toString(); + Buffer b = new Buffer(valueString.getBytes()); + metricsMap.put(key, b); + } + + TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap); + record.setMapFields(t); + buildGenericRecord(record, null, timeStamp, "regionserver"); + output.collect(key, record); + } catch (Exception e) { + log.error(ExceptionUtil.getStackTrace(e)); + } + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java index 8a246e1..7e2e4e2 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java @@ -34,7 +34,7 @@ import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -import org.json.JSONObject; +import org.json.simple.JSONObject; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java index 7c63a26..c2f7b52 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java @@ -36,130 +36,125 @@ import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; +@Tables(annotations = { @Table(name = "JobTracker", columnFamily = "jt"), + @Table(name = "JobTracker", columnFamily = "jvm"), + @Table(name = "JobTracker", columnFamily = "rpc") }) +public class JobTrackerProcessor extends AbstractProcessor { + static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>(); + static { + long zero = 0L; + rateMap.put("SentBytes", zero); + rateMap.put("ReceivedBytes", zero); + rateMap.put("rpcAuthorizationSuccesses", zero); + rateMap.put("rpcAuthorizationFailures", zero); + rateMap.put("RpcQueueTime_num_ops", zero); + rateMap.put("RpcProcessingTime_num_ops", zero); + rateMap.put("heartbeats", zero); + rateMap.put("jobs_submitted", zero); + rateMap.put("jobs_completed", zero); + rateMap.put("jobs_failed", zero); + rateMap.put("jobs_killed", zero); + rateMap.put("maps_launched", zero); + rateMap.put("maps_completed", zero); + rateMap.put("maps_failed", zero); + rateMap.put("maps_killed", zero); + rateMap.put("reduces_launched", zero); + rateMap.put("reduces_completed", zero); + rateMap.put("reduces_failed", zero); + rateMap.put("reduces_killed", zero); + rateMap.put("gcCount", zero); + } -@Tables(annotations={ -@Table(name="JobTracker",columnFamily="jt"), -@Table(name="JobTracker",columnFamily="jvm"), -@Table(name="JobTracker",columnFamily="rpc") -}) -public class JobTrackerProcessor extends AbstractProcessor{ - static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>(); - static { - long zero = 0L; - rateMap.put("SentBytes", zero); - rateMap.put("ReceivedBytes", zero); - rateMap.put("rpcAuthorizationSuccesses", zero); - rateMap.put("rpcAuthorizationFailures", zero); - rateMap.put("RpcQueueTime_num_ops", zero); - rateMap.put("RpcProcessingTime_num_ops", zero); - rateMap.put("heartbeats", zero); - rateMap.put("jobs_submitted", zero); - rateMap.put("jobs_completed", zero); - rateMap.put("jobs_failed", zero); - rateMap.put("jobs_killed", zero); - rateMap.put("maps_launched", zero); - rateMap.put("maps_completed", zero); - rateMap.put("maps_failed", zero); - rateMap.put("maps_killed", zero); - rateMap.put("reduces_launched", zero); - rateMap.put("reduces_completed", zero); - rateMap.put("reduces_failed", zero); - rateMap.put("reduces_killed", zero); - rateMap.put("gcCount", zero); - } + @Override + protected void parse(String recordEntry, + OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) + throws Throwable { + Logger log = Logger.getLogger(JobTrackerProcessor.class); + long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + .getTimeInMillis(); - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, - Reporter reporter) throws Throwable { - Logger log = Logger.getLogger(JobTrackerProcessor.class); - long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - - final ChukwaRecord mapred_jt = new ChukwaRecord(); - final ChukwaRecord jt_jvm = new ChukwaRecord(); - final ChukwaRecord jt_rpc = new ChukwaRecord(); - - Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){ - private static final long serialVersionUID = 1L; - { - put("gcCount", jt_jvm); - put("gcTimeMillis", jt_jvm); - put("logError", jt_jvm); - put("logFatal", jt_jvm); - put("logInfo", jt_jvm); - put("logWarn", jt_jvm); - put("memHeapCommittedM", jt_jvm); - put("memHeapUsedM", jt_jvm); - put("threadsBlocked", jt_jvm); - put("threadsNew", jt_jvm); - put("threadsRunnable", jt_jvm); - put("threadsTerminated", jt_jvm); - put("threadsTimedWaiting", jt_jvm); - put("threadsWaiting", jt_jvm); + final ChukwaRecord mapred_jt = new ChukwaRecord(); + final ChukwaRecord jt_jvm = new ChukwaRecord(); + final ChukwaRecord jt_rpc = new ChukwaRecord(); - put("ReceivedBytes", jt_rpc); - put("RpcProcessingTime_avg_time", jt_rpc); - put("RpcProcessingTime_num_ops", jt_rpc); - put("RpcQueueTime_avg_time", jt_rpc); - put("RpcQueueTime_num_ops", jt_rpc); - put("SentBytes", jt_rpc); - put("rpcAuthorizationSuccesses", jt_rpc); - put("rpcAuthorizationnFailures", jt_rpc); - } - }; - try{ - JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); - String ttTag = chunk.getTag("timeStamp"); - if(ttTag == null){ - log.warn("timeStamp tag not set in JMX adaptor for jobtracker"); - } - else{ - timeStamp = Long.parseLong(ttTag); - } - Iterator<JSONObject> iter = obj.entrySet().iterator(); - - while(iter.hasNext()){ - Map.Entry entry = (Map.Entry)iter.next(); - String key = (String) entry.getKey(); - Object value = entry.getValue(); - String valueString = value == null?"":value.toString(); - - //Calculate rate for some of the metrics - if(rateMap.containsKey(key)){ - long oldValue = rateMap.get(key); - long curValue = Long.parseLong(valueString); - rateMap.put(key, curValue); - long newValue = curValue - oldValue; - if(newValue < 0){ - log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "+key); - newValue = 0L; - } - valueString = Long.toString(newValue); - } - - //These metrics are string types with JSON structure. So we parse them and get the count - if(key.indexOf("Json") >= 0){ - //ignore these for now. Parsing of JSON array is throwing class cast exception. - } - else if(metricsMap.containsKey(key)){ - ChukwaRecord rec = metricsMap.get(key); - rec.add(key, valueString); - } - else { - mapred_jt.add(key, valueString); - } - } - - buildGenericRecord(mapred_jt, null, timeStamp, "jt"); - output.collect(key, mapred_jt); - buildGenericRecord(jt_jvm, null, timeStamp, "jvm"); - output.collect(key, jt_jvm); - buildGenericRecord(jt_rpc, null, timeStamp, "rpc"); - output.collect(key, jt_rpc); - } - catch(Exception e){ - log.error(ExceptionUtil.getStackTrace(e)); - } - } -} + Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() { + private static final long serialVersionUID = 1L; + { + put("gcCount", jt_jvm); + put("gcTimeMillis", jt_jvm); + put("logError", jt_jvm); + put("logFatal", jt_jvm); + put("logInfo", jt_jvm); + put("logWarn", jt_jvm); + put("memHeapCommittedM", jt_jvm); + put("memHeapUsedM", jt_jvm); + put("threadsBlocked", jt_jvm); + put("threadsNew", jt_jvm); + put("threadsRunnable", jt_jvm); + put("threadsTerminated", jt_jvm); + put("threadsTimedWaiting", jt_jvm); + put("threadsWaiting", jt_jvm); + + put("ReceivedBytes", jt_rpc); + put("RpcProcessingTime_avg_time", jt_rpc); + put("RpcProcessingTime_num_ops", jt_rpc); + put("RpcQueueTime_avg_time", jt_rpc); + put("RpcQueueTime_num_ops", jt_rpc); + put("SentBytes", jt_rpc); + put("rpcAuthorizationSuccesses", jt_rpc); + put("rpcAuthorizationnFailures", jt_rpc); + } + }; + try { + JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); + String ttTag = chunk.getTag("timeStamp"); + if (ttTag == null) { + log.warn("timeStamp tag not set in JMX adaptor for jobtracker"); + } else { + timeStamp = Long.parseLong(ttTag); + } + Iterator<String> keys = obj.keySet().iterator(); + + while (keys.hasNext()) { + String key = keys.next(); + Object value = obj.get(key); + String valueString = value == null ? "" : value.toString(); + // Calculate rate for some of the metrics + if (rateMap.containsKey(key)) { + long oldValue = rateMap.get(key); + long curValue = Long.parseLong(valueString); + rateMap.put(key, curValue); + long newValue = curValue - oldValue; + if (newValue < 0) { + log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric " + + key); + newValue = 0L; + } + valueString = Long.toString(newValue); + } + + // These metrics are string types with JSON structure. So we parse them + // and get the count + if (key.indexOf("Json") >= 0) { + // ignore these for now. Parsing of JSON array is throwing class cast + // exception. + } else if (metricsMap.containsKey(key)) { + ChukwaRecord rec = metricsMap.get(key); + rec.add(key, valueString); + } else { + mapred_jt.add(key, valueString); + } + } + + buildGenericRecord(mapred_jt, null, timeStamp, "jt"); + output.collect(key, mapred_jt); + buildGenericRecord(jt_jvm, null, timeStamp, "jvm"); + output.collect(key, jt_jvm); + buildGenericRecord(jt_rpc, null, timeStamp, "rpc"); + output.collect(key, jt_rpc); + } catch (Exception e) { + log.error(ExceptionUtil.getStackTrace(e)); + } + } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java index 16b12f3..79291a1 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java @@ -19,12 +19,15 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; import java.util.Iterator; + import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -import org.json.JSONObject; + +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; public class Log4JMetricsContextProcessor extends AbstractProcessor { @@ -50,24 +53,24 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor { @SuppressWarnings("unchecked") public Log4JMetricsContextChukwaRecord(String recordEntry) throws Throwable { LogEntry log = new LogEntry(recordEntry); - JSONObject json = new JSONObject(log.getBody()); + JSONObject json = (JSONObject) JSONValue.parse(log.getBody()); // round timestamp - timestamp = json.getLong("timestamp"); + timestamp = (Long) json.get("timestamp"); timestamp = (timestamp / 60000) * 60000; // get record type - String contextName = json.getString("contextName"); - String recordName = json.getString("recordName"); + String contextName = (String) json.get("contextName"); + String recordName = (String) json.get("recordName"); recordType = contextName; if (!contextName.equals(recordName)) { recordType += "_" + recordName; } - Iterator<String> ki = json.keys(); + Iterator<String> ki = json.keySet().iterator(); while (ki.hasNext()) { String key = ki.next(); - String value = json.getString(key); + String value = String.valueOf(json.get(key)); if(value != null) { chukwaRecord.add(key, value); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java index 58b2df9..1e6e9d7 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java @@ -36,155 +36,151 @@ import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; -@Tables(annotations={ -@Table(name="Namenode",columnFamily="summary"), -@Table(name="Namenode",columnFamily="hdfs"), -@Table(name="Namenode",columnFamily="rpc"), -@Table(name="Namenode",columnFamily="jvm") -}) -public class NamenodeProcessor extends AbstractProcessor{ - static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>(); - - static { - long zero = 0L; - rateMap.put("AddBlockOps", zero); - rateMap.put("CreateFileOps", zero); - rateMap.put("DeleteFileOps", zero); - rateMap.put("FileInfoOps", zero); - rateMap.put("FilesAppended", zero); - rateMap.put("FilesCreated", zero); - rateMap.put("FilesDeleted", zero); - rateMap.put("FileInGetListingOps", zero); - rateMap.put("FilesRenamed", zero); - rateMap.put("GetBlockLocations", zero); - rateMap.put("GetListingOps", zero); - rateMap.put("SentBytes", zero); - rateMap.put("ReceivedBytes", zero); - rateMap.put("rpcAuthorizationSuccesses", zero); - rateMap.put("rpcAuthorizationFailures", zero); - rateMap.put("RpcQueueTime_num_ops", zero); - rateMap.put("RpcProcessingTime_num_ops", zero); - rateMap.put("gcCount", zero); - } - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, - Reporter reporter) throws Throwable { - try{ - Logger log = Logger.getLogger(NamenodeProcessor.class); - long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - - final ChukwaRecord hdfs_overview = new ChukwaRecord(); - final ChukwaRecord hdfs_namenode = new ChukwaRecord(); - final ChukwaRecord namenode_jvm = new ChukwaRecord(); - final ChukwaRecord namenode_rpc = new ChukwaRecord(); - - Map<String, ChukwaRecord> metricsMap = new HashMap<String,ChukwaRecord>(){ - private static final long serialVersionUID = 1L; - { - put("BlockCapacity", hdfs_overview); - put("BlocksTotal", hdfs_overview); - put("CapacityTotalGB", hdfs_overview); - put("CapacityUsedGB", hdfs_overview); - put("CapacityRemainingGB", hdfs_overview); - put("CorruptBlocks", hdfs_overview); - put("ExcessBlocks", hdfs_overview); - put("FilesTotal", hdfs_overview); - put("MissingBlocks", hdfs_overview); - put("PendingDeletionBlocks", hdfs_overview); - put("PendingReplicationBlocks", hdfs_overview); - put("ScheduledReplicationBlocks", hdfs_overview); - put("TotalLoad", hdfs_overview); - put("UnderReplicatedBlocks", hdfs_overview); - - put("gcCount", namenode_jvm); - put("gcTimeMillis", namenode_jvm); - put("logError", namenode_jvm); - put("logFatal", namenode_jvm); - put("logInfo", namenode_jvm); - put("logWarn", namenode_jvm); - put("memHeapCommittedM", namenode_jvm); - put("memHeapUsedM", namenode_jvm); - put("threadsBlocked", namenode_jvm); - put("threadsNew", namenode_jvm); - put("threadsRunnable", namenode_jvm); - put("threadsTerminated", namenode_jvm); - put("threadsTimedWaiting", namenode_jvm); - put("threadsWaiting", namenode_jvm); - - put("ReceivedBytes", namenode_rpc); - put("RpcProcessingTime_avg_time", namenode_rpc); - put("RpcProcessingTime_num_ops", namenode_rpc); - put("RpcQueueTime_avg_time", namenode_rpc); - put("RpcQueueTime_num_ops", namenode_rpc); - put("SentBytes", namenode_rpc); - put("rpcAuthorizationSuccesses", namenode_rpc); - put("rpcAuthenticationFailures", namenode_rpc); - put("rpcAuthenticationSuccesses", namenode_rpc); - } - }; - - - JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); - String ttTag = chunk.getTag("timeStamp"); - if(ttTag == null){ - log.warn("timeStamp tag not set in JMX adaptor for namenode"); - } - else{ - timeStamp = Long.parseLong(ttTag); - } - Iterator<JSONObject> iter = obj.entrySet().iterator(); - - - while(iter.hasNext()){ - Map.Entry entry = (Map.Entry)iter.next(); - String key = (String) entry.getKey(); - Object value = entry.getValue(); - String valueString = (value == null)?"":value.toString(); - - //These metrics are string types with JSON structure. So we parse them and get the count - if(key.equals("LiveNodes") || key.equals("DeadNodes") || key.equals("DecomNodes") || key.equals("NameDirStatuses")){ - JSONObject jobj = (JSONObject) JSONValue.parse(valueString); - valueString = Integer.toString(jobj.size()); - } - - //Calculate rate for some of the metrics - if(rateMap.containsKey(key)){ - long oldValue = rateMap.get(key); - long curValue = Long.parseLong(valueString); - rateMap.put(key, curValue); - long newValue = curValue - oldValue; - if(newValue < 0){ - log.error("NamenodeProcessor's rateMap might be reset or corrupted for metric "+key); - newValue = 0L; - } - valueString = Long.toString(newValue); - } - - //Check if metric belongs to one of the categories in metricsMap. If not just write it in group Hadoop.HDFS.NameNode - if(metricsMap.containsKey(key)){ - ChukwaRecord rec = metricsMap.get(key); - rec.add(key, valueString); - } - else{ - hdfs_namenode.add(key, valueString); - } - } - buildGenericRecord(hdfs_overview, null, timeStamp, "summary"); - output.collect(key, hdfs_overview); - buildGenericRecord(hdfs_namenode, null, timeStamp, "hdfs"); - output.collect(key, hdfs_namenode); - buildGenericRecord(namenode_jvm, null, timeStamp, "jvm"); - output.collect(key, namenode_jvm); - buildGenericRecord(namenode_rpc, null, timeStamp, "rpc"); - output.collect(key, namenode_rpc); - } - catch(Exception e){ - log.error(ExceptionUtil.getStackTrace(e)); - } - - } - -} +@Tables(annotations = { @Table(name = "Namenode", columnFamily = "summary"), + @Table(name = "Namenode", columnFamily = "hdfs"), + @Table(name = "Namenode", columnFamily = "rpc"), + @Table(name = "Namenode", columnFamily = "jvm") }) +public class NamenodeProcessor extends AbstractProcessor { + static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>(); + + static { + long zero = 0L; + rateMap.put("AddBlockOps", zero); + rateMap.put("CreateFileOps", zero); + rateMap.put("DeleteFileOps", zero); + rateMap.put("FileInfoOps", zero); + rateMap.put("FilesAppended", zero); + rateMap.put("FilesCreated", zero); + rateMap.put("FilesDeleted", zero); + rateMap.put("FileInGetListingOps", zero); + rateMap.put("FilesRenamed", zero); + rateMap.put("GetBlockLocations", zero); + rateMap.put("GetListingOps", zero); + rateMap.put("SentBytes", zero); + rateMap.put("ReceivedBytes", zero); + rateMap.put("rpcAuthorizationSuccesses", zero); + rateMap.put("rpcAuthorizationFailures", zero); + rateMap.put("RpcQueueTime_num_ops", zero); + rateMap.put("RpcProcessingTime_num_ops", zero); + rateMap.put("gcCount", zero); + } + + @Override + protected void parse(String recordEntry, + OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) + throws Throwable { + try { + Logger log = Logger.getLogger(NamenodeProcessor.class); + long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + .getTimeInMillis(); + + final ChukwaRecord hdfs_overview = new ChukwaRecord(); + final ChukwaRecord hdfs_namenode = new ChukwaRecord(); + final ChukwaRecord namenode_jvm = new ChukwaRecord(); + final ChukwaRecord namenode_rpc = new ChukwaRecord(); + + Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() { + private static final long serialVersionUID = 1L; + { + put("BlockCapacity", hdfs_overview); + put("BlocksTotal", hdfs_overview); + put("CapacityTotalGB", hdfs_overview); + put("CapacityUsedGB", hdfs_overview); + put("CapacityRemainingGB", hdfs_overview); + put("CorruptBlocks", hdfs_overview); + put("ExcessBlocks", hdfs_overview); + put("FilesTotal", hdfs_overview); + put("MissingBlocks", hdfs_overview); + put("PendingDeletionBlocks", hdfs_overview); + put("PendingReplicationBlocks", hdfs_overview); + put("ScheduledReplicationBlocks", hdfs_overview); + put("TotalLoad", hdfs_overview); + put("UnderReplicatedBlocks", hdfs_overview); + + put("gcCount", namenode_jvm); + put("gcTimeMillis", namenode_jvm); + put("logError", namenode_jvm); + put("logFatal", namenode_jvm); + put("logInfo", namenode_jvm); + put("logWarn", namenode_jvm); + put("memHeapCommittedM", namenode_jvm); + put("memHeapUsedM", namenode_jvm); + put("threadsBlocked", namenode_jvm); + put("threadsNew", namenode_jvm); + put("threadsRunnable", namenode_jvm); + put("threadsTerminated", namenode_jvm); + put("threadsTimedWaiting", namenode_jvm); + put("threadsWaiting", namenode_jvm); + + put("ReceivedBytes", namenode_rpc); + put("RpcProcessingTime_avg_time", namenode_rpc); + put("RpcProcessingTime_num_ops", namenode_rpc); + put("RpcQueueTime_avg_time", namenode_rpc); + put("RpcQueueTime_num_ops", namenode_rpc); + put("SentBytes", namenode_rpc); + put("rpcAuthorizationSuccesses", namenode_rpc); + put("rpcAuthenticationFailures", namenode_rpc); + put("rpcAuthenticationSuccesses", namenode_rpc); + } + }; + JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); + String ttTag = chunk.getTag("timeStamp"); + if (ttTag == null) { + log.warn("timeStamp tag not set in JMX adaptor for namenode"); + } else { + timeStamp = Long.parseLong(ttTag); + } + Iterator<String> keys = obj.keySet().iterator(); + + while (keys.hasNext()) { + String key = keys.next(); + Object value = obj.get(key); + String valueString = (value == null) ? "" : value.toString(); + + // These metrics are string types with JSON structure. So we parse them + // and get the count + if (key.equals("LiveNodes") || key.equals("DeadNodes") + || key.equals("DecomNodes") || key.equals("NameDirStatuses")) { + JSONObject jobj = (JSONObject) JSONValue.parse(valueString); + valueString = Integer.toString(jobj.size()); + } + + // Calculate rate for some of the metrics + if (rateMap.containsKey(key)) { + long oldValue = rateMap.get(key); + long curValue = Long.parseLong(valueString); + rateMap.put(key, curValue); + long newValue = curValue - oldValue; + if (newValue < 0) { + log.error("NamenodeProcessor's rateMap might be reset or corrupted for metric " + + key); + newValue = 0L; + } + valueString = Long.toString(newValue); + } + + // Check if metric belongs to one of the categories in metricsMap. If + // not just write it in group Hadoop.HDFS.NameNode + if (metricsMap.containsKey(key)) { + ChukwaRecord rec = metricsMap.get(key); + rec.add(key, valueString); + } else { + hdfs_namenode.add(key, valueString); + } + } + buildGenericRecord(hdfs_overview, null, timeStamp, "summary"); + output.collect(key, hdfs_overview); + buildGenericRecord(hdfs_namenode, null, timeStamp, "hdfs"); + output.collect(key, hdfs_namenode); + buildGenericRecord(namenode_jvm, null, timeStamp, "jvm"); + output.collect(key, namenode_jvm); + buildGenericRecord(namenode_rpc, null, timeStamp, "rpc"); + output.collect(key, namenode_rpc); + } catch (Exception e) { + log.error(ExceptionUtil.getStackTrace(e)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java index a1e3435..417fbb5 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java @@ -36,67 +36,64 @@ import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; -@Tables(annotations={ -@Table(name="Zookeeper",columnFamily="zk") -}) -public class ZookeeperProcessor extends AbstractProcessor{ +@Tables(annotations = { @Table(name = "Zookeeper", columnFamily = "zk") }) +public class ZookeeperProcessor extends AbstractProcessor { - static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>(); - static { - long zero = 0L; - rateMap.put("PacketsSent", zero); - rateMap.put("PacketsReceived", zero); - } - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, - Reporter reporter) throws Throwable { - Logger log = Logger.getLogger(ZookeeperProcessor.class); - long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - final ChukwaRecord record = new ChukwaRecord(); - - Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){ - private static final long serialVersionUID = 1L; + static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>(); + static { + long zero = 0L; + rateMap.put("PacketsSent", zero); + rateMap.put("PacketsReceived", zero); + } - { - put("MinRequestLatency", record); - put("AvgRequestLatency", record); - put("MaxRequestLatency", record); - put("PacketsReceived", record); - put("PacketsSent", record); - put("OutstandingRequests", record); - put("NodeCount", record); - put("WatchCount", record); - } - }; - try{ - JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); - String ttTag = chunk.getTag("timeStamp"); - if(ttTag == null){ - log.warn("timeStamp tag not set in JMX adaptor for zookeeper"); - } - else{ - timeStamp = Long.parseLong(ttTag); - } - Iterator<JSONObject> iter = obj.entrySet().iterator(); - - while(iter.hasNext()){ - Map.Entry entry = (Map.Entry)iter.next(); - String key = (String) entry.getKey(); - Object value = entry.getValue(); - String valueString = value == null?"":value.toString(); - - if(metricsMap.containsKey(key)){ - ChukwaRecord rec = metricsMap.get(key); - rec.add(key, valueString); - } - } - - buildGenericRecord(record, null, timeStamp, "zk"); - output.collect(key, record); - } - catch(Exception e){ - log.error(ExceptionUtil.getStackTrace(e)); - } - } + @Override + protected void parse(String recordEntry, + OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) + throws Throwable { + Logger log = Logger.getLogger(ZookeeperProcessor.class); + long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + .getTimeInMillis(); + final ChukwaRecord record = new ChukwaRecord(); + + Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() { + private static final long serialVersionUID = 1L; + + { + put("MinRequestLatency", record); + put("AvgRequestLatency", record); + put("MaxRequestLatency", record); + put("PacketsReceived", record); + put("PacketsSent", record); + put("OutstandingRequests", record); + put("NodeCount", record); + put("WatchCount", record); + } + }; + try { + JSONObject obj = (JSONObject) JSONValue.parse(recordEntry); + String ttTag = chunk.getTag("timeStamp"); + if (ttTag == null) { + log.warn("timeStamp tag not set in JMX adaptor for zookeeper"); + } else { + timeStamp = Long.parseLong(ttTag); + } + Iterator<String> keys = ((JSONObject) obj).keySet().iterator(); + + while (keys.hasNext()) { + String key = keys.next(); + Object value = obj.get(key); + String valueString = value == null ? "" : value.toString(); + + if (metricsMap.containsKey(key)) { + ChukwaRecord rec = metricsMap.get(key); + rec.add(key, valueString); + } + } + + buildGenericRecord(record, null, timeStamp, "zk"); + output.collect(key, record); + } catch (Exception e) { + log.error(ExceptionUtil.getStackTrace(e)); + } + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java index a72e1bd..c2695f2 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java @@ -70,9 +70,9 @@ public class SystemMetrics extends AbstractProcessor { user = user + Double.parseDouble(cpu.get("user").toString()); sys = sys + Double.parseDouble(cpu.get("sys").toString()); idle = idle + Double.parseDouble(cpu.get("idle").toString()); - for (@SuppressWarnings("unchecked") - Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator(); iterator - .hasNext();) { + @SuppressWarnings("unchecked") + Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator(); + while(iterator.hasNext()) { String key = iterator.next(); addRecord("cpu." + key + "." + i, cpu.get(key).toString()); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java index 517a32d..f721978 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java @@ -20,8 +20,13 @@ package org.apache.hadoop.chukwa.hicc; import java.net.*; +import java.text.ParseException; import java.io.*; -import org.json.*; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + import org.apache.log4j.Logger; import org.apache.hadoop.chukwa.util.ExceptionUtil; @@ -57,9 +62,9 @@ public class JSONLoader { public JSONLoader(String source) { String buffer = getContents(source); try { - JSONObject rows = new JSONObject(buffer); - jsonData = new JSONArray(rows.get("rows").toString()); - } catch (JSONException e) { + JSONObject rows = (JSONObject) JSONValue.parse(buffer); + jsonData = (JSONArray) JSONValue.parse(rows.get("rows").toString()); + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } } @@ -68,7 +73,7 @@ public class JSONLoader { String ts = null; try { ts = ((JSONObject) jsonData.get(i)).get("ts").toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return ts; @@ -79,7 +84,7 @@ public class JSONLoader { try { tags = ((JSONObject) jsonData.get(i)).get("tags") .toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return tags; @@ -90,13 +95,13 @@ public class JSONLoader { try { value = ((JSONObject) jsonData.get(i)).get("value") .toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return value; } public int length() { - return jsonData.length(); + return jsonData.size(); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java index 58859c5..dbb6707 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java @@ -21,7 +21,11 @@ package org.apache.hadoop.chukwa.hicc; import java.io.*; import java.util.*; -import org.json.*; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.chukwa.util.ExceptionUtil; @@ -65,8 +69,8 @@ public class Views { File aFile = new File(path); String buffer = getContents(aFile); try { - viewsData = new JSONArray(buffer); - } catch (JSONException e) { + viewsData = (JSONArray) JSONValue.parse(buffer); + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } } @@ -76,7 +80,7 @@ public class Views { try { owner = ((JSONObject) viewsData.get(i)).get("owner") .toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return owner; @@ -86,8 +90,8 @@ public class Views { Iterator permission = null; try { permission = ((JSONObject) ((JSONObject) viewsData.get(i)) - .get("permission")).keys(); - } catch (JSONException e) { + .get("permission")).keySet().iterator(); + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return permission; @@ -100,7 +104,7 @@ public class Views { JSONObject permission = (JSONObject) view.get("permission"); JSONObject user = (JSONObject) permission.get(who); read = user.get("read").toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return read; @@ -110,7 +114,7 @@ public class Views { String write = null; try { write = ((JSONObject) ((JSONObject) ((JSONObject) viewsData.get(i)).get("permission")).get(who)).get("write").toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return write; @@ -121,7 +125,7 @@ public class Views { try { description = ((JSONObject) viewsData.get(i)).get( "description").toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return description; @@ -131,13 +135,13 @@ public class Views { String key = null; try { key = ((JSONObject) viewsData.get(i)).get("key").toString(); - } catch (JSONException e) { + } catch (Exception e) { log.debug(ExceptionUtil.getStackTrace(e)); } return key; } public int length() { - return viewsData.length(); + return viewsData.size(); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java b/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java index 29dab01..b6789b4 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java @@ -1,376 +1,383 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.hicc; - - -import java.io.*; -import java.util.*; -import javax.servlet.*; -import javax.servlet.http.*; -import java.sql.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.chukwa.util.XssFilter; -import org.json.*; -import org.apache.hadoop.chukwa.util.ExceptionUtil; - -public class Workspace extends HttpServlet { - public static final long serialVersionUID = 101L; - private static final Log log = LogFactory.getLog(Workspace.class); - private String path = System.getenv("CHUKWA_DATA_DIR"); - transient private JSONObject hash = new JSONObject(); - transient private XssFilter xf; - - @Override - protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); - } - - public void doGet(HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - xf = new XssFilter(request); - response.setContentType("text/plain"); - String method = xf.getParameter("method"); - if (method.equals("get_views_list")) { - getViewsList(request, response); - } - if (method.equals("get_view")) { - getView(request, response); - } - if (method.equals("save_view")) { - saveView(request, response); - } - if (method.equals("change_view_info")) { - changeViewInfo(request, response); - } - if (method.equals("get_widget_list")) { - getWidgetList(request, response); - } - if (method.equals("clone_view")) { - cloneView(request, response); - } - if (method.equals("delete_view")) { - deleteView(request, response); - } - } - - public void doPost(HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - doGet(request, response); - } - - static public String getContents(File aFile) { - // ...checks on aFile are elided - StringBuffer contents = new StringBuffer(); - - try { - // use buffering, reading one line at a time - // FileReader always assumes default encoding is OK! - BufferedReader input = new BufferedReader(new FileReader(aFile)); - try { - String line = null; // not declared within while loop - /* - * readLine is a bit quirky : it returns the content of a line MINUS the - * newline. it returns null only for the END of the stream. it returns - * an empty String if two newlines appear in a row. - */ - while ((line = input.readLine()) != null) { - contents.append(line); - contents.append(System.getProperty("line.separator")); - } - } finally { - input.close(); - } - } catch (IOException ex) { - ex.printStackTrace(); - } - - return contents.toString(); - } - - public void setContents(String fName, String buffer) { - try { - FileWriter fstream = new FileWriter(fName); - BufferedWriter out = new BufferedWriter(fstream); - out.write(buffer); - out.close(); - } catch (Exception e) { - System.err.println("Error: " + e.getMessage()); - } - } - - public void cloneView(HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - PrintWriter out = response.getWriter(); - String name = xf.getParameter("name"); - String template = xf.getParameter("clone_name"); - File aFile = new File(path + "/views/" + template); - String config = getContents(aFile); - int i = 0; - boolean check = true; - while (check) { - String tmpName = name; - if (i > 0) { - tmpName = name + i; - } - File checkFile = new File(path + "/views/" + tmpName + ".view"); - check = checkFile.exists(); - if (!check) { - name = tmpName; - } - i = i + 1; - } - setContents(path + "/views/" + name + ".view", config); - File deleteCache = new File(path + "/views/workspace_view_list.cache"); - if(!deleteCache.delete()) { - log.warn("Can not delete "+path + "/views/workspace_view_list.cache"); - } - genViewCache(path + "/views"); - aFile = new File(path + "/views/workspace_view_list.cache"); - String viewsCache = getContents(aFile); - out.println(viewsCache); - } - - public void deleteView(HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - String name = xf.getParameter("name"); - File aFile = new File(path + "/views/" + name + ".view"); - if(!aFile.delete()) { - log.warn("Can not delete " + path + "/views/" + name + ".view"); - } - File deleteCache = new File(path + "/views/workspace_view_list.cache"); - if(!deleteCache.delete()) { - log.warn("Can not delete "+path + "/views/workspace_view_list.cache"); - } - genViewCache(path + "/views"); - } - - public void getViewsList(HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - PrintWriter out = response.getWriter(); - genViewCache(path + "/views"); - File aFile = new File(path + "/views/workspace_view_list.cache"); - String viewsCache = getContents(aFile); - out.println(viewsCache); - } - - public void getView(HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - PrintWriter out = response.getWriter(); - String id = xf.getParameter("id"); - genViewCache(path + "/views"); - File aFile = new File(path + "/views/" + id + ".view"); - String view = getContents(aFile); - out.println(view); - } - - public void changeViewInfo(HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - PrintWriter out = response.getWriter(); - String id = xf.getParameter("name"); - String config = request.getParameter("config"); - try { - JSONObject jt = new JSONObject(config); - File aFile = new File(path + "/views/" + id + ".view"); - String original = getContents(aFile); - JSONObject updateObject = new JSONObject(original); - updateObject.put("description", jt.get("description")); - setContents(path + "/views/" + id + ".view", updateObject.toString()); - if (!rename(id, jt.get("description").toString())) { - throw new Exception("Rename view file failed"); - } - File deleteCache = new File(path + "/views/workspace_view_list.cache"); - if(!deleteCache.delete()) { - log.warn("Can not delete "+path + "/views/workspace_view_list.cache"); - } - genViewCache(path + "/views"); - out.println("Workspace is stored successfully."); - } catch (Exception e) { - out.println("Workspace store failed."); - } - } - - public void saveView(HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - PrintWriter out = response.getWriter(); - String id = xf.getParameter("name"); - String config = request.getParameter("config"); - setContents(path + "/views/" + id + ".view", config); - out.println("Workspace is stored successfully."); - } - - public void getWidgetList(HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - PrintWriter out = response.getWriter(); - genWidgetCache(path + "/descriptors"); - File aFile = new File(path + "/descriptors/workspace_plugin.cache"); - String viewsCache = getContents(aFile); - out.println(viewsCache); - } - - private void genViewCache(String source) { - File cacheFile = new File(source + "/workspace_view_list.cache"); - if (!cacheFile.exists()) { - File dir = new File(source); - File[] filesWanted = dir.listFiles(new FilenameFilter() { - public boolean accept(File dir, String name) { - return name.endsWith(".view"); - } - }); - JSONObject[] cacheGroup = new JSONObject[filesWanted.length]; - for (int i = 0; i < filesWanted.length; i++) { - String buffer = getContents(filesWanted[i]); - try { - JSONObject jt = new JSONObject(buffer); - String fn = filesWanted[i].getName(); - jt.put("key", fn.substring(0, (fn.length() - 5))); - cacheGroup[i] = jt; - } catch (Exception e) { - log.debug(ExceptionUtil.getStackTrace(e)); - } - } - String viewList = convertObjectsToViewList(cacheGroup); - setContents(source + "/workspace_view_list.cache", viewList); - } - } - - public String convertObjectsToViewList(JSONObject[] objArray) { - JSONArray jsonArr = new JSONArray(); - JSONObject permission = new JSONObject(); - JSONObject user = new JSONObject(); - try { - permission.put("read", 1); - permission.put("modify", 1); - user.put("all", permission); - } catch (Exception e) { - System.err.println("JSON Exception: " + e.getMessage()); - } - for (int i = 0; i < objArray.length; i++) { - try { - JSONObject jsonObj = new JSONObject(); - jsonObj.put("key", objArray[i].get("key")); - jsonObj.put("description", objArray[i].get("description")); - jsonObj.put("owner", ""); - jsonObj.put("permission", user); - jsonArr.put(jsonObj); - } catch (Exception e) { - System.err.println("JSON Exception: " + e.getMessage()); - } - } - return jsonArr.toString(); - } - - private void genWidgetCache(String source) { - File cacheFile = new File(source + "/workspace_plugin.cache"); - File cacheDir = new File(source); - if (!cacheFile.exists() - || cacheFile.lastModified() < cacheDir.lastModified()) { - File dir = new File(source); - File[] filesWanted = dir.listFiles(new FilenameFilter() { - public boolean accept(File dir, String name) { - return name.endsWith(".descriptor"); - } - }); - JSONObject[] cacheGroup = new JSONObject[filesWanted.length]; - for (int i = 0; i < filesWanted.length; i++) { - String buffer = getContents(filesWanted[i]); - try { - JSONObject jt = new JSONObject(buffer); - cacheGroup[i] = jt; - } catch (Exception e) { - log.debug(ExceptionUtil.getStackTrace(e)); - } - } - String widgetList = convertObjectsToWidgetList(cacheGroup); - setContents(source + "/workspace_plugin.cache", widgetList); - } - } - - public String convertObjectsToWidgetList(JSONObject[] objArray) { - JSONObject jsonObj = new JSONObject(); - JSONArray jsonArr = new JSONArray(); - for (int i = 0; i < objArray.length; i++) { - jsonArr.put(objArray[i]); - } - try { - jsonObj.put("detail", jsonArr); - } catch (Exception e) { - System.err.println("JSON Exception: " + e.getMessage()); - } - for (int i = 0; i < objArray.length; i++) { - try { - String[] categoriesArray = objArray[i].get("categories").toString() - .split(","); - hash = addToHash(hash, categoriesArray, objArray[i]); - } catch (JSONException e) { - System.err.println("JSON Exception: " + e.getMessage()); - } - } - try { - jsonObj.put("children", hash); - } catch (Exception e) { - System.err.println("JSON Exception: " + e.getMessage()); - } - return jsonObj.toString(); - } - - public JSONObject addToHash(JSONObject hash, String[] categoriesArray, - JSONObject obj) { - JSONObject subHash = hash; - for (int i = 0; i < categoriesArray.length; i++) { - String id = categoriesArray[i]; - if (i >= categoriesArray.length - 1) { - try { - subHash.put("leaf:" + obj.get("title"), obj.get("id")); - } catch (Exception e) { - System.err.println("JSON Exception: " + e.getMessage()); - } - } else { - try { - subHash = subHash.getJSONObject("node:" + id); - } catch (JSONException e) { - try { - JSONObject tmpHash = new JSONObject(); - subHash.put("node:" + id, tmpHash); - subHash = tmpHash; - } catch (JSONException ex) { - log.debug(ExceptionUtil.getStackTrace(e)); - } - } - } - } - return hash; - } - - private boolean rename(String id, String desc) { - try { - File view = new File(path + "/views/" + id + ".view"); - File newFile = new File(path + File.separator + "views" + File.separator - + desc + ".view"); - if(!view.renameTo(newFile)) { - log.warn("Can not rename " + path + "/views/" + id + ".view to " + - path + File.separator + "views" + File.separator + desc + ".view"); - } - } catch (Exception e) { - return false; - } - return true; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.hicc; + + +import java.io.*; +import java.util.*; + +import javax.servlet.*; +import javax.servlet.http.*; + +import java.sql.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.chukwa.util.XssFilter; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +import org.apache.hadoop.chukwa.util.ExceptionUtil; + +public class Workspace extends HttpServlet { + public static final long serialVersionUID = 101L; + private static final Log log = LogFactory.getLog(Workspace.class); + private String path = System.getenv("CHUKWA_DATA_DIR"); + transient private JSONObject hash = new JSONObject(); + transient private XssFilter xf; + + @Override + protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); + } + + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + xf = new XssFilter(request); + response.setContentType("text/plain"); + String method = xf.getParameter("method"); + if (method.equals("get_views_list")) { + getViewsList(request, response); + } + if (method.equals("get_view")) { + getView(request, response); + } + if (method.equals("save_view")) { + saveView(request, response); + } + if (method.equals("change_view_info")) { + changeViewInfo(request, response); + } + if (method.equals("get_widget_list")) { + getWidgetList(request, response); + } + if (method.equals("clone_view")) { + cloneView(request, response); + } + if (method.equals("delete_view")) { + deleteView(request, response); + } + } + + public void doPost(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + doGet(request, response); + } + + static public String getContents(File aFile) { + // ...checks on aFile are elided + StringBuffer contents = new StringBuffer(); + + try { + // use buffering, reading one line at a time + // FileReader always assumes default encoding is OK! + BufferedReader input = new BufferedReader(new FileReader(aFile)); + try { + String line = null; // not declared within while loop + /* + * readLine is a bit quirky : it returns the content of a line MINUS the + * newline. it returns null only for the END of the stream. it returns + * an empty String if two newlines appear in a row. + */ + while ((line = input.readLine()) != null) { + contents.append(line); + contents.append(System.getProperty("line.separator")); + } + } finally { + input.close(); + } + } catch (IOException ex) { + ex.printStackTrace(); + } + + return contents.toString(); + } + + public void setContents(String fName, String buffer) { + try { + FileWriter fstream = new FileWriter(fName); + BufferedWriter out = new BufferedWriter(fstream); + out.write(buffer); + out.close(); + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + } + } + + public void cloneView(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + PrintWriter out = response.getWriter(); + String name = xf.getParameter("name"); + String template = xf.getParameter("clone_name"); + File aFile = new File(path + "/views/" + template); + String config = getContents(aFile); + int i = 0; + boolean check = true; + while (check) { + String tmpName = name; + if (i > 0) { + tmpName = name + i; + } + File checkFile = new File(path + "/views/" + tmpName + ".view"); + check = checkFile.exists(); + if (!check) { + name = tmpName; + } + i = i + 1; + } + setContents(path + "/views/" + name + ".view", config); + File deleteCache = new File(path + "/views/workspace_view_list.cache"); + if(!deleteCache.delete()) { + log.warn("Can not delete "+path + "/views/workspace_view_list.cache"); + } + genViewCache(path + "/views"); + aFile = new File(path + "/views/workspace_view_list.cache"); + String viewsCache = getContents(aFile); + out.println(viewsCache); + } + + public void deleteView(HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + String name = xf.getParameter("name"); + File aFile = new File(path + "/views/" + name + ".view"); + if(!aFile.delete()) { + log.warn("Can not delete " + path + "/views/" + name + ".view"); + } + File deleteCache = new File(path + "/views/workspace_view_list.cache"); + if(!deleteCache.delete()) { + log.warn("Can not delete "+path + "/views/workspace_view_list.cache"); + } + genViewCache(path + "/views"); + } + + public void getViewsList(HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + PrintWriter out = response.getWriter(); + genViewCache(path + "/views"); + File aFile = new File(path + "/views/workspace_view_list.cache"); + String viewsCache = getContents(aFile); + out.println(viewsCache); + } + + public void getView(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + PrintWriter out = response.getWriter(); + String id = xf.getParameter("id"); + genViewCache(path + "/views"); + File aFile = new File(path + "/views/" + id + ".view"); + String view = getContents(aFile); + out.println(view); + } + + public void changeViewInfo(HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + PrintWriter out = response.getWriter(); + String id = xf.getParameter("name"); + String config = request.getParameter("config"); + try { + JSONObject jt = (JSONObject) JSONValue.parse(config); + File aFile = new File(path + "/views/" + id + ".view"); + String original = getContents(aFile); + JSONObject updateObject = (JSONObject) JSONValue.parse(original); + updateObject.put("description", jt.get("description")); + setContents(path + "/views/" + id + ".view", updateObject.toString()); + if (!rename(id, jt.get("description").toString())) { + throw new Exception("Rename view file failed"); + } + File deleteCache = new File(path + "/views/workspace_view_list.cache"); + if(!deleteCache.delete()) { + log.warn("Can not delete "+path + "/views/workspace_view_list.cache"); + } + genViewCache(path + "/views"); + out.println("Workspace is stored successfully."); + } catch (Exception e) { + out.println("Workspace store failed."); + } + } + + public void saveView(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + PrintWriter out = response.getWriter(); + String id = xf.getParameter("name"); + String config = request.getParameter("config"); + setContents(path + "/views/" + id + ".view", config); + out.println("Workspace is stored successfully."); + } + + public void getWidgetList(HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + PrintWriter out = response.getWriter(); + genWidgetCache(path + "/descriptors"); + File aFile = new File(path + "/descriptors/workspace_plugin.cache"); + String viewsCache = getContents(aFile); + out.println(viewsCache); + } + + private void genViewCache(String source) { + File cacheFile = new File(source + "/workspace_view_list.cache"); + if (!cacheFile.exists()) { + File dir = new File(source); + File[] filesWanted = dir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.endsWith(".view"); + } + }); + JSONObject[] cacheGroup = new JSONObject[filesWanted.length]; + for (int i = 0; i < filesWanted.length; i++) { + String buffer = getContents(filesWanted[i]); + try { + JSONObject jt = (JSONObject) JSONValue.parse(buffer); + String fn = filesWanted[i].getName(); + jt.put("key", fn.substring(0, (fn.length() - 5))); + cacheGroup[i] = jt; + } catch (Exception e) { + log.debug(ExceptionUtil.getStackTrace(e)); + } + } + String viewList = convertObjectsToViewList(cacheGroup); + setContents(source + "/workspace_view_list.cache", viewList); + } + } + + public String convertObjectsToViewList(JSONObject[] objArray) { + JSONArray jsonArr = new JSONArray(); + JSONObject permission = new JSONObject(); + JSONObject user = new JSONObject(); + try { + permission.put("read", 1); + permission.put("modify", 1); + user.put("all", permission); + } catch (Exception e) { + System.err.println("JSON Exception: " + e.getMessage()); + } + for (int i = 0; i < objArray.length; i++) { + try { + JSONObject jsonObj = new JSONObject(); + jsonObj.put("key", objArray[i].get("key")); + jsonObj.put("description", objArray[i].get("description")); + jsonObj.put("owner", ""); + jsonObj.put("permission", user); + jsonArr.add(jsonObj); + } catch (Exception e) { + System.err.println("JSON Exception: " + e.getMessage()); + } + } + return jsonArr.toString(); + } + + private void genWidgetCache(String source) { + File cacheFile = new File(source + "/workspace_plugin.cache"); + File cacheDir = new File(source); + if (!cacheFile.exists() + || cacheFile.lastModified() < cacheDir.lastModified()) { + File dir = new File(source); + File[] filesWanted = dir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.endsWith(".descriptor"); + } + }); + JSONObject[] cacheGroup = new JSONObject[filesWanted.length]; + for (int i = 0; i < filesWanted.length; i++) { + String buffer = getContents(filesWanted[i]); + try { + JSONObject jt = (JSONObject) JSONValue.parse(buffer); + cacheGroup[i] = jt; + } catch (Exception e) { + log.debug(ExceptionUtil.getStackTrace(e)); + } + } + String widgetList = convertObjectsToWidgetList(cacheGroup); + setContents(source + "/workspace_plugin.cache", widgetList); + } + } + + public String convertObjectsToWidgetList(JSONObject[] objArray) { + JSONObject jsonObj = new JSONObject(); + JSONArray jsonArr = new JSONArray(); + for (int i = 0; i < objArray.length; i++) { + jsonArr.add(objArray[i]); + } + try { + jsonObj.put("detail", jsonArr); + } catch (Exception e) { + System.err.println("JSON Exception: " + e.getMessage()); + } + for (int i = 0; i < objArray.length; i++) { + try { + String[] categoriesArray = objArray[i].get("categories").toString() + .split(","); + hash = addToHash(hash, categoriesArray, objArray[i]); + } catch (Exception e) { + System.err.println("JSON Exception: " + e.getMessage()); + } + } + try { + jsonObj.put("children", hash); + } catch (Exception e) { + System.err.println("JSON Exception: " + e.getMessage()); + } + return jsonObj.toString(); + } + + public JSONObject addToHash(JSONObject hash, String[] categoriesArray, + JSONObject obj) { + JSONObject subHash = hash; + for (int i = 0; i < categoriesArray.length; i++) { + String id = categoriesArray[i]; + if (i >= categoriesArray.length - 1) { + try { + subHash.put("leaf:" + obj.get("title"), obj.get("id")); + } catch (Exception e) { + System.err.println("JSON Exception: " + e.getMessage()); + } + } else { + try { + subHash = (JSONObject) subHash.get("node:" + id); + } catch (Exception e) { + try { + JSONObject tmpHash = new JSONObject(); + subHash.put("node:" + id, tmpHash); + subHash = tmpHash; + } catch (Exception ex) { + log.debug(ExceptionUtil.getStackTrace(e)); + } + } + } + } + return hash; + } + + private boolean rename(String id, String desc) { + try { + File view = new File(path + "/views/" + id + ".view"); + File newFile = new File(path + File.separator + "views" + File.separator + + desc + ".view"); + if(!view.renameTo(newFile)) { + log.warn("Can not rename " + path + "/views/" + id + ".view to " + + path + File.separator + "views" + File.separator + desc + ".view"); + } + } catch (Exception e) { + return false; + } + return true; + } + +}
