CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/7f662e8c Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/7f662e8c Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/7f662e8c Branch: refs/heads/master Commit: 7f662e8c625b7f14164329ff75ef038456e600ca Parents: 0961ec1 Author: Eric Yang <[email protected]> Authored: Sat Jul 18 15:55:36 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Sat Jul 25 17:49:13 2015 -0700 ---------------------------------------------------------------------- pom.xml | 6 +- .../salsa/fsm/DataNodeClientTraceMapper.java | 60 ++++---- .../chukwa/analysis/salsa/fsm/FSMBuilder.java | 114 +++++++-------- .../analysis/salsa/fsm/FSMIntermedEntry.java | 140 ++++++++++--------- .../salsa/fsm/JobHistoryTaskDataMapper.java | 63 ++++----- .../analysis/salsa/fsm/ParseUtilities.java | 12 +- .../salsa/fsm/TaskTrackerClientTraceMapper.java | 50 +++---- .../analysis/salsa/visualization/Heatmap.java | 99 +++++++------ .../analysis/salsa/visualization/Swimlanes.java | 51 +++---- .../hadoop/chukwa/database/Aggregator.java | 7 +- .../hadoop/chukwa/database/DataExpiration.java | 41 +++--- .../apache/hadoop/chukwa/database/Macro.java | 46 +++--- .../chukwa/database/MetricsAggregation.java | 6 +- .../hadoop/chukwa/database/TableCreator.java | 63 +++++---- .../chukwa/datacollection/DataFactory.java | 19 ++- .../datacollection/OffsetStatsManager.java | 16 +-- .../datacollection/adaptor/AbstractWrapper.java | 1 - .../datacollection/adaptor/ExecAdaptor.java | 3 - .../datacollection/adaptor/FileAdaptor.java | 35 ++--- .../datacollection/adaptor/JMXAdaptor.java | 18 ++- .../datacollection/adaptor/OozieAdaptor.java | 2 +- .../datacollection/adaptor/SocketAdaptor.java | 10 +- .../datacollection/adaptor/SyslogAdaptor.java | 2 +- .../adaptor/WriteaheadBuffered.java | 21 ++- .../adaptor/filetailer/FileTailingAdaptor.java | 3 +- .../adaptor/filetailer/LWFTAdaptor.java | 4 +- .../adaptor/filetailer/RCheckFTAdaptor.java | 20 ++- .../adaptor/heartbeat/HttpStatusChecker.java | 3 +- .../datacollection/adaptor/jms/JMSAdaptor.java | 17 ++- .../jms/JMSMessagePropertyTransformer.java | 8 +- .../adaptor/jms/JMSTextMessageTransformer.java | 4 +- .../agent/AdaptorResetThread.java | 6 +- .../agent/AgentControlSocketListener.java | 2 +- .../agent/rest/AdaptorConfig.java | 1 - .../agent/rest/AdaptorController.java | 15 +- .../collector/servlet/CommitCheckServlet.java | 31 +++- .../collector/servlet/LogDisplayServlet.java | 29 ++-- .../collector/servlet/ServletCollector.java | 28 +--- .../connector/ChunkCatcherConnector.java | 2 +- .../connector/PipelineConnector.java | 2 +- .../connector/http/HttpConnector.java | 10 +- .../datacollection/sender/AsyncAckSender.java | 18 ++- .../datacollection/sender/ChukwaHttpSender.java | 18 ++- .../sender/RetryListOfCollectors.java | 7 +- .../test/ConsoleOutConnector.java | 4 +- .../datacollection/test/FilePerPostWriter.java | 22 +-- .../test/FileTailerStressTest.java | 19 ++- .../datacollection/test/SinkFileValidator.java | 12 +- .../datacollection/writer/ConsoleWriter.java | 4 +- .../chukwa/datacollection/writer/Dedup.java | 2 +- .../datacollection/writer/ExtractorWriter.java | 6 +- .../datacollection/writer/InMemoryWriter.java | 3 - .../writer/PipelineStageWriter.java | 6 +- .../datacollection/writer/SeqFileWriter.java | 90 ++++++------ .../datacollection/writer/SocketTeeWriter.java | 27 ++-- .../writer/localfs/LocalToRemoteHdfsMover.java | 5 +- .../writer/localfs/LocalWriter.java | 29 ++-- .../datacollection/writer/solr/SolrWriter.java | 5 +- .../chukwa/dataloader/DataLoaderFactory.java | 19 ++- .../hadoop/chukwa/dataloader/FSMDataLoader.java | 4 +- .../chukwa/dataloader/MetricDataLoader.java | 45 +++--- .../chukwa/dataloader/MetricDataLoaderPool.java | 4 +- .../chukwa/dataloader/SocketDataLoader.java | 7 +- .../hadoop/chukwa/datastore/UserStore.java | 16 ++- .../hadoop/chukwa/datastore/ViewStore.java | 14 +- .../hadoop/chukwa/datastore/WidgetStore.java | 11 +- .../chukwa/datatrigger/HttpTriggerAction.java | 17 ++- .../archive/ChukwaArchiveManager.java | 4 +- .../demux/DailyChukwaRecordRolling.java | 2 +- .../hadoop/chukwa/extraction/demux/Demux.java | 32 +++-- .../chukwa/extraction/demux/DemuxManager.java | 8 +- .../demux/HourlyChukwaRecordRolling.java | 2 +- .../extraction/demux/PostProcessorManager.java | 18 ++- .../chukwa/extraction/demux/RecordMerger.java | 6 +- .../processor/mapper/AbstractProcessor.java | 4 +- .../demux/processor/mapper/ChunkSaver.java | 6 +- .../processor/mapper/ClientTraceProcessor.java | 2 +- .../processor/mapper/DatanodeProcessor.java | 8 +- .../processor/mapper/HBaseMasterProcessor.java | 11 +- .../mapper/HBaseRegionServerProcessor.java | 11 +- .../mapper/HadoopMetricsProcessor.java | 17 ++- .../processor/mapper/JobConfProcessor.java | 20 ++- .../mapper/JobLogHistoryProcessor.java | 12 +- .../processor/mapper/JobTrackerProcessor.java | 11 +- .../mapper/Log4JMetricsContextProcessor.java | 11 +- .../mapper/Log4jJobHistoryProcessor.java | 5 +- .../demux/processor/mapper/LogEntry.java | 6 +- .../processor/mapper/NamenodeProcessor.java | 8 +- .../demux/processor/mapper/SysLog.java | 7 - .../demux/processor/mapper/SystemMetrics.java | 72 ++++++---- .../processor/mapper/ZookeeperProcessor.java | 8 +- .../demux/processor/reducer/ClientTrace.java | 4 +- .../processor/reducer/MRJobReduceProcessor.java | 10 +- .../chukwa/extraction/engine/ChukwaRecord.java | 10 +- .../extraction/engine/ChukwaRecordJT.java | 8 +- .../extraction/engine/ChukwaRecordKey.java | 1 + .../hadoop/chukwa/extraction/engine/Token.java | 4 + .../engine/datasource/DsDirectory.java | 2 +- .../engine/datasource/database/DatabaseDS.java | 14 +- .../record/ChukwaDSInternalResult.java | 8 ++ .../datasource/record/ChukwaFileParser.java | 6 +- .../record/ChukwaRecordDataSource.java | 29 ++-- .../record/ChukwaSequenceFileParser.java | 2 +- .../apache/hadoop/chukwa/hicc/JSONLoader.java | 4 +- .../hadoop/chukwa/hicc/OfflineTimeHandler.java | 6 +- .../org/apache/hadoop/chukwa/hicc/Views.java | 4 +- .../org/apache/hadoop/chukwa/hicc/ViewsTag.java | 10 +- .../apache/hadoop/chukwa/hicc/Workspace.java | 58 ++++---- .../hadoop/chukwa/hicc/bean/BarOptions.java | 40 ++++++ .../hadoop/chukwa/hicc/bean/LineOptions.java | 17 ++- .../hadoop/chukwa/hicc/bean/PointOptions.java | 18 ++- .../hadoop/chukwa/hicc/bean/SeriesOptions.java | 28 +++- .../log4j/ChukwaDailyRollingFileAppender.java | 44 +++--- .../inputtools/log4j/ChukwaTaskLogAppender.java | 79 +++++++++++ .../inputtools/log4j/TaskLogAppender.java | 80 ----------- .../chukwa/inputtools/mdl/DataConfig.java | 2 +- .../chukwa/inputtools/mdl/ErStreamHandler.java | 4 +- .../chukwa/inputtools/mdl/LoaderServer.java | 19 +-- .../chukwa/inputtools/plugin/ExecPlugin.java | 24 ++-- .../hadoop/chukwa/rest/bean/ParametersBean.java | 2 +- .../hadoop/chukwa/rest/bean/ViewBean.java | 4 +- .../hadoop/chukwa/rest/bean/WidgetBean.java | 4 +- .../chukwa/rest/resource/ClientTrace.java | 3 +- .../hadoop/chukwa/util/AdaptorNamingUtils.java | 7 +- .../apache/hadoop/chukwa/util/ClassUtils.java | 12 +- .../hadoop/chukwa/util/CopySequenceFile.java | 4 +- .../hadoop/chukwa/util/CreateRecordFile.java | 15 +- .../apache/hadoop/chukwa/util/DumpArchive.java | 6 +- .../apache/hadoop/chukwa/util/DumpChunks.java | 4 +- .../org/apache/hadoop/chukwa/util/Filter.java | 4 +- .../hadoop/chukwa/util/HierarchyDataType.java | 4 +- .../apache/hadoop/chukwa/util/TempFileUtil.java | 29 ++-- .../apache/hadoop/chukwa/util/XssFilter.java | 29 ---- .../metrics/spi/AbstractMetricsContext.java | 14 +- .../org/apache/hadoop/chukwa/ChunkImplTest.java | 1 + .../adaptor/TestDirTailingAdaptor.java | 1 - .../agent/rest/TestAdaptorController.java | 16 +++ .../collector/TestDelayedAcks.java | 2 - .../collector/TestFailedCollectorAck.java | 4 +- .../datacollection/writer/TestHBaseWriter.java | 8 +- src/test/resources/tasklog-log4j.properties | 2 +- 141 files changed, 1394 insertions(+), 1091 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ba4f745..6fab0be 100644 --- a/pom.xml +++ b/pom.xml @@ -175,9 +175,9 @@ <version>3.1</version> </dependency> <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.4</version> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.4</version> </dependency> <dependency> <groupId>commons-logging</groupId> http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java index ec37f7a..0445d41 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java @@ -104,25 +104,25 @@ public class DataNodeClientTraceMapper src_add = src_regex.group(1); } else { log.warn("Failed to match src IP:"+val.getValue("src")+""); - src_add = new String(""); + src_add = ""; } Matcher dest_regex = ipPattern.matcher(val.getValue("dest")); if (dest_regex.matches()) { dest_add = dest_regex.group(1); } else { log.warn("Failed to match dest IP:"+val.getValue("dest")+""); - dest_add = new String(""); + dest_add = ""; } Matcher datanodeserver_regex = ipPattern.matcher(val.getValue("srvID")); if (datanodeserver_regex.matches()) { datanodeserver_add = datanodeserver_regex.group(1); } else { log.warn("Failed to match DataNode server address:"+val.getValue("srvID")+""); - datanodeserver_add = new String(""); + datanodeserver_add = ""; } - start_rec.host_exec = new String(src_add); - end_rec.host_exec = new String(src_add); + start_rec.host_exec = src_add; + end_rec.host_exec = src_add; blkid = val.getValue("blockid").trim(); if (fieldNamesList.contains("cliID")) { @@ -131,7 +131,7 @@ public class DataNodeClientTraceMapper cli_id = cli_id.substring(10); } } else { - cli_id = new String(""); + cli_id = ""; } current_op = val.getValue("op"); String [] k = key.getKey().split("/"); @@ -148,21 +148,21 @@ public class DataNodeClientTraceMapper } start_rec.time_orig_epoch = k[0]; - start_rec.time_orig = (new Long(actual_time_ms)).toString(); // not actually used - start_rec.timestamp = (new Long(actual_time_ms)).toString(); - start_rec.time_end = new String(""); - start_rec.time_start = new String(start_rec.timestamp); + start_rec.time_orig = Long.toString(actual_time_ms); // not actually used + start_rec.timestamp = Long.toString(actual_time_ms); + start_rec.time_end = ""; + start_rec.time_start = start_rec.timestamp; end_rec.time_orig_epoch = k[0]; - end_rec.time_orig = val.getValue("actual_time"); - end_rec.timestamp = new String(val.getValue("actual_time")); - end_rec.time_end = new String(val.getValue("actual_time")); - end_rec.time_start = new String(""); + end_rec.time_orig = val.getValue("actual_time"); + end_rec.timestamp = val.getValue("actual_time"); + end_rec.time_end = val.getValue("actual_time"); + end_rec.time_start = ""; log.debug("Duration: " + (Long.parseLong(end_rec.time_end) - Long.parseLong(start_rec.time_start))); - end_rec.job_id = new String(cli_id); // use job id = block id - start_rec.job_id = new String(cli_id); + end_rec.job_id = cli_id; // use job id = block id + start_rec.job_id = cli_id; if (current_op.equals("HDFS_READ")) { if (src_add != null && src_add.equals(dest_add)) { @@ -171,43 +171,41 @@ public class DataNodeClientTraceMapper start_rec.state_hdfs = new HDFSState(HDFSState.READ_REMOTE); } // should these ALWAYS be dest? - start_rec.host_other = new String(dest_add); - end_rec.host_other = new String(dest_add); + start_rec.host_other = dest_add; + end_rec.host_other = dest_add; } else if (current_op.equals("HDFS_WRITE")) { if (src_add != null && dest_add.equals(datanodeserver_add)) { start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_LOCAL); - } else if (dest_add != null && !dest_add.equals(datanodeserver_add)) { + } else if (!dest_add.equals(datanodeserver_add)) { start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REMOTE); } else { - start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REPLICATED); + start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REPLICATED); } - start_rec.host_other = new String(dest_add); - end_rec.host_other = new String(dest_add); + start_rec.host_other = dest_add; + end_rec.host_other = dest_add; } else { log.warn("Invalid state: " + current_op); } end_rec.state_hdfs = start_rec.state_hdfs; start_rec.state_name = start_rec.state_hdfs.toString(); end_rec.state_name = end_rec.state_hdfs.toString(); - start_rec.identifier = new String(blkid); - end_rec.identifier = new String(blkid); + start_rec.identifier = blkid; + end_rec.identifier = blkid; - start_rec.unique_id = new String(start_rec.state_name + "@" + - start_rec.identifier + "@" + start_rec.job_id); - end_rec.unique_id = new String(end_rec.state_name + "@" + - end_rec.identifier + "@" + end_rec.job_id); + start_rec.unique_id = new StringBuilder().append(start_rec.state_name).append("@").append(start_rec.identifier).append("@").append(start_rec.job_id).toString(); + end_rec.unique_id = new StringBuilder().append(end_rec.state_name).append("@").append(end_rec.identifier).append("@").append(end_rec.job_id).toString(); start_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField)); start_rec.add_info.put("csource",val.getValue("csource")); end_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField)); end_rec.add_info.put("csource",val.getValue("csource")); - end_rec.add_info.put("STATE_STRING",new String("SUCCESS")); // by default + end_rec.add_info.put("STATE_STRING","SUCCESS"); // by default // add counter value end_rec.add_info.put("BYTES",val.getValue("bytes")); - String crk_mid_string_start = new String(start_rec.getUniqueID() + "_" + start_rec.timestamp); - String crk_mid_string_end = new String(end_rec.getUniqueID() + "_" + start_rec.timestamp); + String crk_mid_string_start = new StringBuilder().append(start_rec.getUniqueID()).append("_").append(start_rec.timestamp).toString(); + String crk_mid_string_end = new StringBuilder().append(end_rec.getUniqueID()).append("_").append(start_rec.timestamp).toString(); output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_start), start_rec); output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_end), end_rec); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java index d3a1656..c4b2536 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java @@ -96,13 +96,10 @@ public class FSMBuilder extends Configured implements Tool { assert(fnl.contains("TIME_START")); assert(fnl.contains("COUNTER_BYTES")); - String id1 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START")); - String id2 = new String("map"+JCDF_SEP+cr.getValue("JOB_ID")); - String et = new String( - (new Long(Long.parseLong(cr.getValue("TIME_END")) - - Long.parseLong(cr.getValue("TIME_START")))).toString() - ); - String ev = new String(cr.getValue("COUNTER_BYTES")); + String id1 = new StringBuilder().append(cr.getValue("TASK_ID")).append(JCDF_SEP).append(cr.getValue("TIME_START")).toString(); + String id2 = new StringBuilder().append("map").append(JCDF_SEP).append(cr.getValue("JOB_ID")).toString(); + String et = Long.toString((Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START")))); + String ev = new StringBuilder().append(cr.getValue("COUNTER_BYTES")).toString(); cr.add(JCDF_ID1, id1); cr.add(JCDF_ID2, id2); cr.add(JCDF_EDGE_TIME, et); @@ -120,13 +117,10 @@ public class FSMBuilder extends Configured implements Tool { assert(fnl.contains("TIME_START")); assert(fnl.contains("COUNTER_INPUT_BYTES")); - String id1 = new String("map"+JCDF_SEP+cr.getValue("TASK_ID")); - String id2 = new String("shuf"+JCDF_SEP+cr.getValue("TASK_ID")); - String et = new String( - (new Long(Long.parseLong(cr.getValue("TIME_END")) - - Long.parseLong(cr.getValue("TIME_START")))).toString() - ); - String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); + String id1 = new StringBuilder().append("map").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String id2 = new StringBuilder().append("shuf").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String et = Long.toString((Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START")))); + String ev = cr.getValue("COUNTER_INPUT_BYTES"); cr.add(JCDF_ID1, id1); cr.add(JCDF_ID2, id2); cr.add(JCDF_EDGE_TIME, et); @@ -154,13 +148,13 @@ public class FSMBuilder extends Configured implements Tool { redid = id_parts[0]; mapid = id_parts[1]; - String id1 = new String("shuf"+JCDF_SEP+mapid); - String id2 = new String("shufred"+JCDF_SEP+redid); - String et = new String( - (new Long(Long.parseLong(cr.getValue("TIME_END")) - - Long.parseLong(cr.getValue("TIME_START")))).toString() + String id1 = new StringBuilder().append("shuf").append(JCDF_SEP).append(mapid).toString(); + String id2 = new StringBuilder().append("shufred").append(JCDF_SEP).append(redid).toString(); + String et = Long.toString( + Long.parseLong(cr.getValue("TIME_END")) - + Long.parseLong(cr.getValue("TIME_START")) ); - String ev = new String(cr.getValue("COUNTER_BYTES")); + String ev = cr.getValue("COUNTER_BYTES"); cr.add(JCDF_ID1, id1); cr.add(JCDF_ID2, id2); cr.add(JCDF_EDGE_TIME, et); @@ -178,13 +172,13 @@ public class FSMBuilder extends Configured implements Tool { assert(fnl.contains("TIME_START")); assert(fnl.contains("COUNTER_INPUT_BYTES")); - String id1 = new String("shufred"+JCDF_SEP+cr.getValue("TASK_ID")); - String id2 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID")); - String et = new String( - (new Long(Long.parseLong(cr.getValue("TIME_END")) - - Long.parseLong(cr.getValue("TIME_START")))).toString() + String id1 = new StringBuilder().append("shufred").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String id2 = new StringBuilder().append("redsort").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String et = Long.toString( + (Long.parseLong(cr.getValue("TIME_END")) - + Long.parseLong(cr.getValue("TIME_START"))) ); - String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); + String ev = new StringBuilder().append(cr.getValue("COUNTER_INPUT_BYTES")).toString(); cr.add(JCDF_ID1, id1); cr.add(JCDF_ID2, id2); cr.add(JCDF_EDGE_TIME, et); @@ -202,13 +196,13 @@ public class FSMBuilder extends Configured implements Tool { assert(fnl.contains("TIME_START")); assert(fnl.contains("COUNTER_INPUT_BYTES")); - String id1 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID")); - String id2 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID")); - String et = new String( - (new Long(Long.parseLong(cr.getValue("TIME_END")) - - Long.parseLong(cr.getValue("TIME_START")))).toString() + String id1 = new StringBuilder().append("redsort").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String id2 = new StringBuilder().append("red").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String et = Long.toString( + Long.parseLong(cr.getValue("TIME_END")) - + Long.parseLong(cr.getValue("TIME_START")) ); - String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); + String ev = new StringBuilder().append(cr.getValue("COUNTER_INPUT_BYTES")).toString(); cr.add(JCDF_ID1, id1); cr.add(JCDF_ID2, id2); cr.add(JCDF_EDGE_TIME, et); @@ -226,19 +220,16 @@ public class FSMBuilder extends Configured implements Tool { assert(fnl.contains("TIME_START")); assert(fnl.contains("COUNTER_INPUT_BYTES")); - String id1 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID")); - String id2 = new String("redout"+JCDF_SEP+cr.getValue("TASK_ID")); - String et = new String( - (new Long(Long.parseLong(cr.getValue("TIME_END")) - - Long.parseLong(cr.getValue("TIME_START")))).toString() - ); - String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); + String id1 = new StringBuilder().append("red").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String id2 = new StringBuilder().append("redout").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString(); + String et = Long.toString(Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START"))); + String ev = cr.getValue("COUNTER_INPUT_BYTES"); cr.add(JCDF_ID1, id1); cr.add(JCDF_ID2, id2); cr.add(JCDF_EDGE_TIME, et); cr.add(JCDF_EDGE_VOL, ev); - } - + } + protected void addStitchingFields_blockwrite (ChukwaRecord cr, ArrayList<String> fnl) { @@ -248,13 +239,10 @@ public class FSMBuilder extends Configured implements Tool { assert(fnl.contains("TIME_START")); assert(fnl.contains("COUNTER_BYTES")); - String id1 = new String("redout"+JCDF_SEP+cr.getValue("JOB_ID")); - String id2 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START")); - String et = new String( - (new Long(Long.parseLong(cr.getValue("TIME_END")) - - Long.parseLong(cr.getValue("TIME_START")))).toString() - ); - String ev = new String(cr.getValue("COUNTER_BYTES")); + String id1 = new StringBuilder().append("redout").append(JCDF_SEP).append(cr.getValue("JOB_ID")).toString(); + String id2 = new StringBuilder().append(cr.getValue("TASK_ID")).append(JCDF_SEP).append(cr.getValue("TIME_START")).toString(); + String et = new StringBuilder().append(Long.toString(Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START")))).toString(); + String ev = cr.getValue("COUNTER_BYTES"); cr.add(JCDF_ID1, id1); cr.add(JCDF_ID2, id2); cr.add(JCDF_EDGE_TIME, et); @@ -340,17 +328,17 @@ public class FSMBuilder extends Configured implements Tool { // error handling? } - cr.add(new String("STATE_NAME"),start_rec.state_name); - cr.add(new String("STATE_UNIQ_ID"),start_rec.getUniqueID()); - cr.add(new String("TIMESTAMP"),start_rec.timestamp); - cr.add(new String("TIME_START"),start_rec.time_start); - cr.add(new String("TIME_END"),end_rec.time_end); - cr.add(new String("TIME_START_MILLIS"),start_rec.time_start.substring(start_rec.time_start.length()-3)); - cr.add(new String("TIME_END_MILLIS"),end_rec.time_end.substring(end_rec.time_end.length()-3)); - cr.add(new String("HOST"),start_rec.host_exec); - cr.add(new String("HOST_OTHER"),start_rec.host_other); - cr.add(new String("JOB_ID"),start_rec.job_id); - cr.add(new String("TASK_ID"),start_rec.getFriendlyID()); + cr.add("STATE_NAME",start_rec.state_name); + cr.add("STATE_UNIQ_ID",start_rec.getUniqueID()); + cr.add("TIMESTAMP",start_rec.timestamp); + cr.add("TIME_START",start_rec.time_start); + cr.add("TIME_END",end_rec.time_end); + cr.add("TIME_START_MILLIS",start_rec.time_start.substring(start_rec.time_start.length()-3)); + cr.add("TIME_END_MILLIS",end_rec.time_end.substring(end_rec.time_end.length()-3)); + cr.add("HOST",start_rec.host_exec); + cr.add("HOST_OTHER",start_rec.host_other); + cr.add("JOB_ID",start_rec.job_id); + cr.add("TASK_ID",start_rec.getFriendlyID()); Set<String> treemapkeys = end_rec.add_info.keySet(); Iterator<String> keyIter = treemapkeys.iterator(); @@ -360,17 +348,17 @@ public class FSMBuilder extends Configured implements Tool { String currkey = keyIter.next(); if (currkey != null && !noncounters.contains(currkey)) { - cr.add(new String("COUNTER_" + currkey), end_rec.add_info.get(currkey)); + cr.add("COUNTER_" + currkey, end_rec.add_info.get(currkey)); } else if (currkey != null && noncounters.contains(currkey)) { - cr.add(new String(currkey), end_rec.add_info.get(currkey)); + cr.add(currkey, end_rec.add_info.get(currkey)); } } assert(!keyIter.hasNext()); cr.setTime(Long.parseLong(start_rec.timestamp)); newkey = null; - newkey = new String(start_rec.time_orig_epoch + - SEP + start_rec.getUniqueID() + SEP + start_rec.time_orig); + newkey = new StringBuilder().append(start_rec.time_orig_epoch).append(SEP).append(start_rec.getUniqueID()). + append(SEP).append(start_rec.time_orig).toString(); log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java index 163e81a..aecc1f6 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java @@ -24,8 +24,10 @@ import java.io.DataOutput; import java.util.Iterator; import java.util.TreeMap; import java.util.Set; +import java.util.Map.Entry; import org.apache.hadoop.io.WritableComparable; +import org.apache.commons.lang3.builder.HashCodeBuilder; /* * FSM Intermediate State Entry @@ -79,20 +81,20 @@ public class FSMIntermedEntry this.state_hdfs = new HDFSState(HDFSState.NONE); this.state_type = new StateType(StateType.STATE_NOOP); this.add_info = new TreeMap<String, String>(); - this.host_other = new String(""); - this.job_id = new String(""); - this.time_orig_epoch = new String(""); - this.time_orig = new String(""); + this.host_other = ""; + this.job_id = ""; + this.time_orig_epoch = ""; + this.time_orig = ""; } public String getUniqueID() { - return new String(this.unique_id); + return this.unique_id; } public String getFriendlyID() { - return new String(this.identifier); + return this.identifier; } /** @@ -103,13 +105,13 @@ public class FSMIntermedEntry if (this.fsm_type.val == FSMType.MAPREDUCE_FSM || this.fsm_type.val == FSMType.MAPREDUCE_FSM_INCOMPLETE) { - this.state_name = new String(this.state_mapred.toString()); - } else if (this.fsm_type.val == FSMType.MAPREDUCE_FSM || - this.fsm_type.val == FSMType.MAPREDUCE_FSM_INCOMPLETE) + this.state_name = this.state_mapred.toString(); + } else if (this.fsm_type.val == FSMType.FILESYSTEM_FSM || + this.fsm_type.val == FSMType.FILESYSTEM_FSM_INCOMPLETE) { - this.state_name = new String(this.state_hdfs.toString()); + this.state_name = this.state_hdfs.toString(); } - this.unique_id = new String(this.state_name + "@" + this.identifier); + this.unique_id = new StringBuilder().append(this.state_name).append("@").append(this.identifier).toString(); } public void write(DataOutput out) throws IOException { @@ -142,26 +144,20 @@ public class FSMIntermedEntry if (job_id.length() > 0) out.writeUTF(job_id); out.writeInt(identifier.length()); if (identifier.length() > 0) out.writeUTF(identifier); - + mapKeys = this.add_info.keySet(); out.writeInt(mapKeys.size()); - Iterator<String> keyIter = mapKeys.iterator(); - - for (int i = 0; i < mapKeys.size(); i++) { - assert(keyIter.hasNext()); - String currKey = keyIter.next(); - if (currKey != null) { - String currvalue = this.add_info.get(currKey); - out.writeUTF(currKey); - out.writeInt(currvalue.length()); - if (currvalue.length() > 0) { - out.writeUTF(currvalue); - } - } else { - out.writeUTF(new String("NULL")); - out.writeInt(0); - } + for(Entry<String, String> entry : this.add_info.entrySet()) { + String value = entry.getValue(); + if(value.length() > 0) { + out.writeUTF(entry.getKey()); + out.writeInt(value.length()); + out.writeUTF(value); + } else { + out.writeUTF("NULL"); + out.writeInt(0); + } } } @@ -176,47 +172,47 @@ public class FSMIntermedEntry currlen = in.readInt(); if (currlen > 0) this.state_name = in.readUTF(); - else this.state_name = new String(""); + else this.state_name = ""; currlen = in.readInt(); if (currlen > 0) this.unique_id = in.readUTF(); - else this.unique_id = new String(""); + else this.unique_id = ""; currlen = in.readInt(); if (currlen > 0) this.timestamp = in.readUTF(); - else this.timestamp = new String(""); + else this.timestamp = ""; currlen = in.readInt(); if (currlen > 0) this.time_start = in.readUTF(); - else this.time_start = new String(""); + else this.time_start = ""; currlen = in.readInt(); if (currlen > 0) this.time_end = in.readUTF(); - else this.time_end = new String(""); + else this.time_end = ""; currlen = in.readInt(); if (currlen > 0) this.host_exec = in.readUTF(); - else this.host_exec = new String(""); + else this.host_exec = ""; currlen = in.readInt(); if (currlen > 0) this.host_other = in.readUTF(); - else this.host_other = new String(""); + else this.host_other = ""; currlen = in.readInt(); if (currlen > 0) this.time_orig_epoch = in.readUTF(); - else this.time_orig_epoch = new String(""); + else this.time_orig_epoch = ""; currlen = in.readInt(); if (currlen > 0) this.time_orig = in.readUTF(); - else this.time_orig = new String(""); + else this.time_orig = ""; currlen = in.readInt(); if (currlen > 0) this.job_id = in.readUTF(); - else this.job_id = new String(""); + else this.job_id = ""; currlen = in.readInt(); if (currlen > 0) this.identifier = in.readUTF(); - else this.identifier = new String(""); + else this.identifier = ""; numkeys = in.readInt(); @@ -235,15 +231,34 @@ public class FSMIntermedEntry } } - + @Override + public int hashCode() { + return new HashCodeBuilder(13, 71). + append(this.unique_id). + toHashCode(); + } + + @Override public boolean equals (Object o) { - FSMIntermedEntry other = (FSMIntermedEntry) o; - return this.unique_id.equals(other.unique_id); + if((o instanceof FSMIntermedEntry)) { + FSMIntermedEntry other = (FSMIntermedEntry) o; + return this.unique_id.equals(other.unique_id); + } + return false; } public int compareTo (Object o) { - FSMIntermedEntry other = (FSMIntermedEntry) o; - return this.unique_id.compareTo(other.unique_id); + final int BEFORE = -1; + final int EQUAL = 0; + //this optimization is usually worthwhile, and can + //always be added + if ( this == o ) return EQUAL; + + if((o instanceof FSMIntermedEntry)) { + FSMIntermedEntry other = (FSMIntermedEntry) o; + return this.unique_id.compareTo(other.unique_id); + } + return BEFORE; } /* @@ -260,36 +275,29 @@ public class FSMIntermedEntry newObj.fsm_type = new FSMType(this.fsm_type.val); /* Deep copy all strings */ - newObj.state_name = new String(this.state_name); - newObj.unique_id = new String(this.unique_id); - newObj.timestamp = new String(this.timestamp); - newObj.time_start = new String(this.time_start); - newObj.time_end = new String(this.time_end); + newObj.state_name = this.state_name; + newObj.unique_id = this.unique_id; + newObj.timestamp = this.timestamp; + newObj.time_start = this.time_start; + newObj.time_end = this.time_end; - newObj.time_orig_epoch = new String(this.time_orig_epoch); - newObj.time_orig = new String(this.time_orig); - newObj.job_id = new String(this.job_id); + newObj.time_orig_epoch = this.time_orig_epoch; + newObj.time_orig = this.time_orig; + newObj.job_id = this.job_id; /* Deep copy of TreeMap */ newObj.add_info = new TreeMap<String,String>(); - mapKeys = this.add_info.keySet(); - Iterator<String> keyIter = mapKeys.iterator(); - String currKey = null; - - for (int i = 0; i < mapKeys.size(); i++) { - assert(keyIter.hasNext()); - currKey = keyIter.next(); - if (currKey != null) { - newObj.add_info.put(currKey, this.add_info.get(currKey)); - } - } - + for(Entry<String, String> entry : this.add_info.entrySet()) { + String currKey = entry.getKey(); + String value = entry.getValue(); + newObj.add_info.put(currKey, value); + } return newObj; } public String toString() { - return new String(this.state_name + "@" + this.unique_id); + return new StringBuilder().append(this.state_name).append("@").append(this.unique_id).toString(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java index 3de268e..9da5461 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java @@ -24,11 +24,8 @@ import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.chukwa.extraction.demux.*; import org.apache.hadoop.chukwa.extraction.engine.*; -import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; /** * Pluggable mapper for FSMBuilder @@ -84,8 +81,8 @@ public class JobHistoryTaskDataMapper this_rec.add_info.put(mapCounterDestNames[i], val.getValue(mapCounterNames[i])); } } - this_rec.add_info.put("FILE_BYTES_READ",new String("0")); // to have same fields as reduce - this_rec.add_info.put("INPUT_GROUPS",new String("0")); // to have same fields as reduce + this_rec.add_info.put("FILE_BYTES_READ","0"); // to have same fields as reduce + this_rec.add_info.put("INPUT_GROUPS","0"); // to have same fields as reduce return this_rec; } @@ -128,7 +125,7 @@ public class JobHistoryTaskDataMapper } } - this_rec.add_info.put("OUTPUT_BYTES",new String("0")); // to have same fields as map + this_rec.add_info.put("OUTPUT_BYTES","0"); // to have same fields as map return this_rec; } @@ -162,17 +159,17 @@ public class JobHistoryTaskDataMapper /* Check if this is a start or end entry, set state type, extract start/end times */ if (fieldNamesList.contains("START_TIME")) { this_rec.state_type.val = StateType.STATE_START; - this_rec.timestamp = new String(val.getValue("START_TIME")); - this_rec.time_start = new String(val.getValue("START_TIME")); - this_rec.time_end = new String(""); + this_rec.timestamp = val.getValue("START_TIME"); + this_rec.time_start = val.getValue("START_TIME"); + this_rec.time_end = ""; if (val.getValue("START_TIME").length() < 4+2) { // needs to at least have milliseconds add_record = add_record & false; } } else if (fieldNamesList.contains("FINISH_TIME")) { this_rec.state_type.val = StateType.STATE_END; - this_rec.timestamp = new String(val.getValue("FINISH_TIME")); - this_rec.time_start = new String(""); - this_rec.time_end = new String(val.getValue("FINISH_TIME")); + this_rec.timestamp = val.getValue("FINISH_TIME"); + this_rec.time_start = ""; + this_rec.time_end = val.getValue("FINISH_TIME"); if (val.getValue("FINISH_TIME").length() < 4+2) { // needs to at least have milliseconds add_record = add_record & false; } @@ -201,31 +198,31 @@ public class JobHistoryTaskDataMapper } // Fill state name, unique ID - this_rec.state_name = new String(this_rec.state_mapred.toString()); - this_rec.identifier = new String(val.getValue("TASK_ATTEMPT_ID")); + this_rec.state_name = this_rec.state_mapred.toString(); + this_rec.identifier = val.getValue("TASK_ATTEMPT_ID"); this_rec.generateUniqueID(); // Extract hostname from tracker name (if present), or directly fill from hostname (<= 0.18) if (fieldNamesList.contains("HOSTNAME")) { - this_rec.host_exec = new String(val.getValue("HOSTNAME")); + this_rec.host_exec = val.getValue("HOSTNAME"); this_rec.host_exec = ParseUtilities.removeRackFromHostname(this_rec.host_exec); } else if (fieldNamesList.contains("TRACKER_NAME")) { this_rec.host_exec = ParseUtilities.extractHostnameFromTrackerName(val.getValue("TRACKER_NAME")); } else { - this_rec.host_exec = new String(""); + this_rec.host_exec = ""; } if (this_rec.state_type.val == StateType.STATE_END) { assert(fieldNamesList.contains("TASK_STATUS")); String tmpstring = null; tmpstring = val.getValue("TASK_STATUS"); - if (tmpstring.equals("KILLED") || tmpstring.equals("FAILED")) { + if (tmpstring != null && (tmpstring.equals("KILLED") || tmpstring.equals("FAILED"))) { add_record = add_record & false; } if (tmpstring != null && tmpstring.length() > 0) { this_rec.add_info.put("STATE_STRING",tmpstring); } else { - this_rec.add_info.put("STATE_STRING",new String("")); + this_rec.add_info.put("STATE_STRING",""); } switch(this_rec.state_mapred.val) { @@ -281,9 +278,9 @@ public class JobHistoryTaskDataMapper redshuf_start_rec.state_type = new StateType(StateType.STATE_START); redshuf_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT); - redshuf_start_rec.timestamp = new String(this_rec.timestamp); - redshuf_start_rec.time_start = new String(this_rec.timestamp); - redshuf_start_rec.time_end = new String(""); + redshuf_start_rec.timestamp = this_rec.timestamp; + redshuf_start_rec.time_start = this_rec.timestamp; + redshuf_start_rec.time_end = ""; redshuf_start_rec.generateUniqueID(); @@ -352,20 +349,20 @@ public class JobHistoryTaskDataMapper } else { return false; } - redshuf_end_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED")); - redshuf_end_rec.time_start = new String(""); - redshuf_end_rec.time_end = new String(val.getValue("SHUFFLE_FINISHED")); - redsort_start_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED")); - redsort_start_rec.time_start = new String(val.getValue("SHUFFLE_FINISHED")); - redsort_start_rec.time_end = new String(""); + redshuf_end_rec.timestamp = val.getValue("SHUFFLE_FINISHED"); + redshuf_end_rec.time_start = ""; + redshuf_end_rec.time_end = val.getValue("SHUFFLE_FINISHED"); + redsort_start_rec.timestamp = val.getValue("SHUFFLE_FINISHED"); + redsort_start_rec.time_start = val.getValue("SHUFFLE_FINISHED"); + redsort_start_rec.time_end = ""; assert(fieldNamesList.contains("SORT_FINISHED")); - redsort_end_rec.timestamp = new String(val.getValue("SORT_FINISHED")); - redsort_end_rec.time_start = new String(""); - redsort_end_rec.time_end = new String(val.getValue("SORT_FINISHED")); - redred_start_rec.timestamp = new String(val.getValue("SORT_FINISHED")); - redred_start_rec.time_start = new String(val.getValue("SORT_FINISHED")); - redred_start_rec.time_end = new String(""); + redsort_end_rec.timestamp = val.getValue("SORT_FINISHED"); + redsort_end_rec.time_start = ""; + redsort_end_rec.time_end = val.getValue("SORT_FINISHED"); + redred_start_rec.timestamp = val.getValue("SORT_FINISHED"); + redred_start_rec.time_start = val.getValue("SORT_FINISHED"); + redred_start_rec.time_end = ""; /* redred_end times are exactly the same as the original red_end times */ http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java index 311d140..0840695 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java @@ -35,9 +35,9 @@ public class ParseUtilities { if (st.countTokens() != 3) { throw new Exception("Expected 3 tokens from ChukwaRecordKey but only found " + st.countTokens() + "."); } - rec.time_orig_epoch = new String(st.nextToken()); - rec.job_id = new String(st.nextToken()); - rec.time_orig = new String(st.nextToken()); + rec.time_orig_epoch = st.nextToken(); + rec.job_id = st.nextToken(); + rec.time_orig = st.nextToken(); return rec; } @@ -45,7 +45,7 @@ public class ParseUtilities { { int firstPos = "tracker_".length(); int secondPos; - String hostname = new String(""); + String hostname = ""; if (trackerName.startsWith("tracker_")) { secondPos = trackerName.indexOf(":",firstPos); @@ -59,9 +59,9 @@ public class ParseUtilities { { int pos = origHostname.lastIndexOf("/"); if (pos > -1) { - return new String(origHostname.substring(pos)); + return origHostname.substring(pos); } else { - return new String(origHostname); + return origHostname; } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java index 188b076..a87c426 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java @@ -103,35 +103,35 @@ public class TaskTrackerClientTraceMapper src_add = src_regex.group(1); } else { log.warn("Failed to match src IP:"+val.getValue("src")+""); - src_add = new String(""); + src_add = ""; } Matcher dest_regex = ipPattern.matcher(val.getValue("dest")); if (dest_regex.matches()) { dest_add = dest_regex.group(1); } else { log.warn("Failed to match dest IP:"+val.getValue("dest")+""); - dest_add = new String(""); + dest_add = ""; } if (fieldNamesList.contains("reduceID")) { - reduce_id = new String(val.getValue("reduceID")); + reduce_id = val.getValue("reduceID"); } else { // add a random number so we get unique keys or the CRK will break Random r = new Random(); - reduce_id = new String("noreduce" + r.nextInt()); + reduce_id = "noreduce" + r.nextInt(); } if (fieldNamesList.contains("cliID")) { - map_id = new String(val.getValue("cliID").trim()); + map_id = val.getValue("cliID").trim(); } else { - map_id = new String("nomap"); + map_id = "nomap"; } current_op = val.getValue("op"); - start_rec.host_exec = new String(src_add); - end_rec.host_exec = new String(src_add); - start_rec.host_other = new String(dest_add); - end_rec.host_other = new String(dest_add); + start_rec.host_exec = src_add; + end_rec.host_exec = src_add; + start_rec.host_other = dest_add; + end_rec.host_other = dest_add; // timestamp of the log entry is the end time; // subtract duration to get start time @@ -149,21 +149,21 @@ public class TaskTrackerClientTraceMapper String [] k = key.getKey().split("/"); start_rec.time_orig_epoch = k[0]; - start_rec.time_orig = (Long.valueOf(actual_time_ms)).toString(); // not actually used - start_rec.timestamp = (Long.valueOf(actual_time_ms)).toString(); - start_rec.time_end = new String(""); - start_rec.time_start = new String(start_rec.timestamp); + start_rec.time_orig = Long.toString(actual_time_ms); // not actually used + start_rec.timestamp = Long.toString(actual_time_ms); + start_rec.time_end = ""; + start_rec.time_start = start_rec.timestamp; end_rec.time_orig_epoch = k[0]; - end_rec.time_orig = val.getValue("actual_time"); - end_rec.timestamp = new String(val.getValue("actual_time")); - end_rec.time_end = new String(val.getValue("actual_time")); - end_rec.time_start = new String(""); + end_rec.time_orig = val.getValue("actual_time"); + end_rec.timestamp = val.getValue("actual_time"); + end_rec.time_end = val.getValue("actual_time"); + end_rec.time_start = ""; log.debug("Duration: " + (Long.parseLong(end_rec.time_end) - Long.parseLong(start_rec.time_start))); - start_rec.job_id = new String(reduce_id); // use job id = block id - end_rec.job_id = new String(reduce_id); + start_rec.job_id = reduce_id; // use job id = block id + end_rec.job_id = reduce_id; if (current_op.equals("MAPRED_SHUFFLE")) { if (src_add != null && src_add.equals(dest_add)) { @@ -177,8 +177,8 @@ public class TaskTrackerClientTraceMapper end_rec.state_mapred = start_rec.state_mapred; start_rec.state_name = start_rec.state_mapred.toString(); end_rec.state_name = end_rec.state_mapred.toString(); - start_rec.identifier = new String(reduce_id + "@" + map_id); - end_rec.identifier = new String(reduce_id + "@" + map_id); + start_rec.identifier = new StringBuilder().append(reduce_id).append("@").append(map_id).toString(); + end_rec.identifier = new StringBuilder().append(reduce_id).append("@").append(map_id).toString(); start_rec.generateUniqueID(); end_rec.generateUniqueID(); @@ -187,13 +187,13 @@ public class TaskTrackerClientTraceMapper start_rec.add_info.put("csource",val.getValue("csource")); end_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField)); end_rec.add_info.put("csource",val.getValue("csource")); - end_rec.add_info.put("STATE_STRING",new String("SUCCESS")); // by default + end_rec.add_info.put("STATE_STRING","SUCCESS"); // by default // add counter value end_rec.add_info.put("BYTES",val.getValue("bytes")); - String crk_mid_string_start = new String(start_rec.getUniqueID() + "_" + start_rec.timestamp); - String crk_mid_string_end = new String(end_rec.getUniqueID() + "_" + start_rec.timestamp); + String crk_mid_string_start = new StringBuilder().append(start_rec.getUniqueID()).append("_").append(start_rec.timestamp).toString(); + String crk_mid_string_end = new StringBuilder().append(end_rec.getUniqueID()).append("_").append(start_rec.timestamp).toString(); output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_start), start_rec); output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_end), end_rec); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java index eb2cc32..37f341c 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java @@ -96,7 +96,7 @@ public class Heatmap { protected String timezone; protected String query_state; protected String query_stat_type; - protected final String table = new String("filesystem_fsm"); + protected final String table = "filesystem_fsm"; protected boolean plot_legend = false; // controls whether to plot hostnames protected boolean sort_nodes = true; protected boolean plot_additional_info = true; @@ -122,11 +122,11 @@ public class Heatmap { final String addinfoshapegroup = "AddInfoShape"; public Heatmap() { - this.cluster = new String(""); - this.timezone = new String(""); - this.query_state = new String(""); - this.query_stat_type = new String(""); - param_map = new HashMap<String, String>(); + this.cluster = ""; + this.timezone = ""; + this.query_state = ""; + this.query_stat_type = ""; + param_map = new HashMap<String, String>(); } /** @@ -141,14 +141,14 @@ public class Heatmap { String query_stat_type, HashMap<String, String> valmap) { - this.cluster = new String(cluster); + this.cluster = cluster; if (timezone != null) { - this.timezone = new String(timezone); + this.timezone = timezone; } else { this.timezone = null; } - this.query_state = new String(event_type); - this.query_stat_type = new String(query_stat_type); + this.query_state = event_type; + this.query_stat_type = query_stat_type; /* This should "simulate" an HttpServletRequest * Need to have "start" and "end" in seconds since Epoch @@ -162,14 +162,14 @@ public class Heatmap { HashMap<String, String> valmap, String shuffles) { - this.cluster = new String(cluster); + this.cluster = cluster; if (timezone != null) { - this.timezone = new String(timezone); + this.timezone = timezone; } else { this.timezone = null; } - this.query_state = new String(query_state); - this.query_stat_type = new String(query_stat_type); + this.query_state = query_state; + this.query_stat_type = query_stat_type; /* This should "simulate" an HttpServletRequest * Need to have "start" and "end" in seconds since Epoch @@ -185,14 +185,14 @@ public class Heatmap { int w, int h) { - this.cluster = new String(cluster); + this.cluster = cluster; if (timezone != null) { - this.timezone = new String(timezone); + this.timezone = timezone; } else { this.timezone = null; } - this.query_state = new String(query_state); - this.query_stat_type = new String(query_stat_type); + this.query_state = query_state; + this.query_stat_type = query_stat_type; /* This should "simulate" an HttpServletRequest * Need to have "start" and "end" in seconds since Epoch @@ -211,15 +211,15 @@ public class Heatmap { this.cluster = session.getAttribute("cluster").toString(); String query_state = xf.getParameter("query_state"); if (query_state != null) { - this.query_state = new String(query_state); + this.query_state = query_state; } else { - this.query_state = new String("read"); + this.query_state = "read"; } String query_stat_type = xf.getParameter("query_stat_type"); if (query_stat_type != null) { - this.query_stat_type = new String(query_stat_type); + this.query_stat_type = query_stat_type; } else { - this.query_stat_type = new String("transaction_count"); + this.query_stat_type = "transaction_count"; } this.timezone = session.getAttribute("time_zone").toString(); } @@ -262,9 +262,6 @@ public class Heatmap { this.viz.setRendererFactory(new RendererFactory(){ AbstractShapeRenderer sr = new ShapeRenderer(); ShapeRenderer sr_big = new ShapeRenderer(BOXWIDTH); - Renderer arY = new AxisRenderer(Constants.LEFT, Constants.TOP); - Renderer arX = new AxisRenderer(Constants.CENTER, Constants.BOTTOM); - PolygonRenderer pr = new PolygonRenderer(Constants.POLY_TYPE_LINE); LabelRenderer lr = new LabelRenderer("label"); LabelRenderer lr_legend = new LabelRenderer("label"); @@ -363,18 +360,18 @@ public class Heatmap { VisualTable legend_labels_table_viz = this.viz.addTable(addinfogroup, legend_labels_table); - legend_labels_table_viz.setFloat(0, VisualItem.X, this.SIZE_X/2); - legend_labels_table_viz.setFloat(0, VisualItem.Y, BORDER[1]/2); + legend_labels_table_viz.setFloat(0, VisualItem.X, this.SIZE_X/2f); + legend_labels_table_viz.setFloat(0, VisualItem.Y, BORDER[1]/2f); legend_labels_table_viz.setTextColor(0,ColorLib.color(java.awt.Color.BLACK)); legend_labels_table_viz.setFont(0,new Font(Font.SANS_SERIF,Font.PLAIN,LEGEND_FONT_SIZE)); - legend_labels_table_viz.setFloat(1, VisualItem.X, this.SIZE_X/2); - legend_labels_table_viz.setFloat(1, VisualItem.Y, BORDER[1] + (BOXWIDTH*hd.num_hosts) + BORDER[3]/2); + legend_labels_table_viz.setFloat(1, VisualItem.X, this.SIZE_X/2f); + legend_labels_table_viz.setFloat(1, VisualItem.Y, BORDER[1] + (BOXWIDTH*hd.num_hosts) + BORDER[3]/2f); legend_labels_table_viz.setTextColor(1,ColorLib.color(java.awt.Color.BLACK)); legend_labels_table_viz.setFont(1,new Font(Font.SANS_SERIF,Font.PLAIN,LEGEND_FONT_SIZE)); - legend_labels_table_viz.setFloat(2, VisualItem.X, BORDER[0] + (BOXWIDTH*hd.num_hosts) + BORDER[2]/2); - legend_labels_table_viz.setFloat(2, VisualItem.Y, this.SIZE_Y/2); + legend_labels_table_viz.setFloat(2, VisualItem.X, BORDER[0] + (BOXWIDTH*hd.num_hosts) + BORDER[2]/2f); + legend_labels_table_viz.setFloat(2, VisualItem.Y, this.SIZE_Y/2f); legend_labels_table_viz.setTextColor(2,ColorLib.color(java.awt.Color.BLACK)); legend_labels_table_viz.setFont(2,new Font(Font.SANS_SERIF,Font.PLAIN,LEGEND_FONT_SIZE)); @@ -530,6 +527,9 @@ public class Heatmap { * Interfaces with database to get data and * populate data structures for rendering */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = + "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", + justification = "Dynamic based upon tables in the database") public HeatmapData getData() { // preliminary setup OfflineTimeHandler time_offline; @@ -549,25 +549,27 @@ public class Heatmap { DatabaseWriter dbw = new DatabaseWriter(this.cluster); // setup query + + String sqlTemplate = "select block_id,start_time,finish_time,start_time_millis,finish_time_millis,status,state_name,hostname,other_host,bytes from [%s] where finish_time between '[start]' and '[end]' and (%s) order by start_time"; String query; if (this.query_state != null && this.query_state.equals("read")) { - query = "select block_id,start_time,finish_time,start_time_millis,finish_time_millis,status,state_name,hostname,other_host,bytes from ["+table+"] where finish_time between '[start]' and '[end]' and (state_name like 'read_local' or state_name like 'read_remote')"; + query = String.format(sqlTemplate, table,"state_name like 'read_local' or state_name like 'read_remote'"); } else if (this.query_state != null && this.query_state.equals("write")) { - query = "select block_id,start_time,finish_time,start_time_millis,finish_time_millis,status,state_name,hostname,other_host,bytes from ["+table+"] where finish_time between '[start]' and '[end]' and (state_name like 'write_local' or state_name like 'write_remote' or state_name like 'write_replicated')"; + query = String.format(sqlTemplate, table, "state_name like 'write_local' or state_name like 'write_remote' or state_name like 'write_replicated'"); } else { - query = "select block_id,start_time,finish_time,start_time_millis,finish_time_millis,status,state_name,hostname,other_host,bytes from ["+table+"] where finish_time between '[start]' and '[end]' and state_name like '" + query_state + "'"; + query = String.format(sqlTemplate, table, "state_name like '" + query_state + "'"); } Macro mp = new Macro(start,end,query); - query = mp.toString() + " order by start_time"; + String q = mp.toString(); ArrayList<HashMap<String, Object>> events = new ArrayList<HashMap<String, Object>>(); ResultSet rs = null; - log.debug("Query: " + query); + log.debug("Query: " + q); // run query, extract results try { - rs = dbw.query(query); + rs = dbw.query(q); ResultSetMetaData rmeta = rs.getMetaData(); int col = rmeta.getColumnCount(); while (rs.next()) { @@ -609,8 +611,8 @@ public class Heatmap { Iterator<String> host_iter = host_set.iterator(); for (int i = 0; i < num_hosts && host_iter.hasNext(); i++) { String curr_host = host_iter.next(); - host_indices.put(curr_host, new Integer(i)); - host_rev_indices.put(new Integer(i),curr_host); + host_indices.put(curr_host, i); + host_rev_indices.put(i,curr_host); } System.out.println("Number of hosts: " + num_hosts); @@ -747,28 +749,25 @@ public class Heatmap { // collate data HeatmapData hd = new HeatmapData(); - hd.stats = new long[num_hosts][num_hosts]; hd.stats = stats; hd.min = min; hd.max = max; hd.num_hosts = num_hosts; hd.agg_tab = agg_tab; - this.add_info_extra = new String("\nState: "+this.prettyStateNames.get(this.query_state)+ - " ("+events.size()+" "+this.query_state+"'s ["+this.query_stat_type+"])\n" + - "Plotted value range: ["+hd.min+","+hd.max+"] (Zeros in black)"); + this.add_info_extra = new StringBuilder().append("\nState: ").append(this.prettyStateNames.get(this.query_state)). + append(" (").append(events.size()).append(" ").append(this.query_state). + append("'s [").append(this.query_stat_type).append("])\n"). + append("Plotted value range: [").append(hd.min).append(",").append(hd.max). + append("] (Zeros in black)").toString(); hd.hostnames = new String [num_hosts]; for (int i = 0; i < num_hosts; i++) { - String curr_host = host_rev_indices.get(new Integer(permute[i])); - if (sort_nodes) { - hd.hostnames[i] = new String(curr_host); - } else { - hd.hostnames[i] = new String(curr_host); - } + String curr_host = host_rev_indices.get(permute[i]); + hd.hostnames[i] = curr_host; } return hd; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java index 87bf40c..07d9576 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java @@ -79,7 +79,7 @@ public class Swimlanes { * Modifier for generic Swimlanes plots to plot shuffle, sort, and reducer * states of same reduce on same line */ - protected class MapReduceSwimlanes { + protected static class MapReduceSwimlanes { protected Table plot_tab; protected HashMap<String, ArrayList<Tuple> > reducepart_hash; protected boolean collate_reduces = false; @@ -302,7 +302,7 @@ public class Swimlanes { palette = ColorLib.getCategoryPalette(states.length); colourmap = new HashMap<String,Integer>(); for (int i = 0; i < states.length; i++) { - colourmap.put(states[i], new Integer(palette[i])); + colourmap.put(states[i], Integer.valueOf(palette[i])); } } public int getColour(String state_name) { @@ -413,7 +413,7 @@ public class Swimlanes { protected String cluster; protected String timezone; protected String shuffle_option; - protected final String table = new String("mapreduce_fsm"); + protected final String table = "mapreduce_fsm"; protected boolean plot_legend = true; protected String jobname = null; @@ -436,9 +436,9 @@ public class Swimlanes { final String legendshapegroup = "LegendShape"; public Swimlanes() { - this.cluster = new String(""); - this.timezone = new String(""); - this.shuffle_option = new String(""); + this.cluster = ""; + this.timezone = ""; + this.shuffle_option = ""; param_map = new HashMap<String, String>(); } @@ -453,13 +453,13 @@ public class Swimlanes { (String timezone, String cluster, String event_type, HashMap<String, String> valmap) { - this.cluster = new String(cluster); + this.cluster = cluster; if (timezone != null) { - this.timezone = new String(timezone); + this.timezone = timezone; } else { this.timezone = null; } - this.shuffle_option = new String(event_type); + this.shuffle_option = event_type; /* This should "simulate" an HttpServletRequest * Need to have "start" and "end" in seconds since Epoch @@ -471,13 +471,13 @@ public class Swimlanes { (String timezone, String cluster, String event_type, HashMap<String, String> valmap, int width, int height) { - this.cluster = new String(cluster); + this.cluster = cluster; if (timezone != null) { - this.timezone = new String(timezone); + this.timezone = timezone; } else { this.timezone = null; } - this.shuffle_option = new String(event_type); + this.shuffle_option = event_type; /* This should "simulate" an HttpServletRequest * Need to have "start" and "end" in seconds since Epoch @@ -493,13 +493,13 @@ public class Swimlanes { HashMap<String, String> valmap, int width, int height, String legend_opt) { - this.cluster = new String(cluster); + this.cluster = cluster; if (timezone != null) { - this.timezone = new String(timezone); + this.timezone = timezone; } else { this.timezone = null; } - this.shuffle_option = new String(event_type); + this.shuffle_option = event_type; /* This should "simulate" an HttpServletRequest * Need to have "start" and "end" in seconds since Epoch @@ -523,9 +523,9 @@ public class Swimlanes { this.cluster = session.getAttribute("cluster").toString(); String evt_type = xf.getParameter("event_type"); if (evt_type != null) { - this.shuffle_option = new String(evt_type); + this.shuffle_option = evt_type; } else { - this.shuffle_option = new String("noshuffle"); + this.shuffle_option = "noshuffle"; } this.timezone = session.getAttribute("time_zone").toString(); } @@ -535,7 +535,7 @@ public class Swimlanes { * Call before calling @see #run */ public void setJobName(String s) { - this.jobname = new String(s); + this.jobname = s; } /** @@ -692,11 +692,11 @@ public class Swimlanes { textlabels_table.addColumn("label",String.class); textlabels_table.addColumn("type",String.class); textlabels_table.addRow(); - textlabels_table.setString(0,"label",new String("Time/s")); - textlabels_table.setString(0,"type",new String("xaxisname")); + textlabels_table.setString(0,"label","Time/s"); + textlabels_table.setString(0,"type","xaxisname"); VisualTable textlabelsviz = this.viz.addTable(labelgroup, textlabels_table); - textlabelsviz.setX(0,SIZE_X/2); + textlabelsviz.setX(0,SIZE_X/2d); textlabelsviz.setY(0,SIZE_Y - BORDER[2] + (BORDER[2]*0.1)); textlabelsviz.setTextColor(0,ColorLib.color(java.awt.Color.GRAY)); textlabelsviz.setFont(0,new Font(Font.SANS_SERIF,Font.PLAIN,AXIS_NAME_FONT_SIZE)); @@ -835,6 +835,9 @@ public class Swimlanes { } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = + "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", + justification = "Dynamic based upon tables in the database") public Table getData() { // preliminary setup OfflineTimeHandler time_offline; @@ -885,16 +888,14 @@ public class Swimlanes { } HashMap<String, Integer> state_counts = new HashMap<String, Integer>(); - HashSet<String> states = new HashSet<String>(); for (int i = 0; i < rs_tab.getRowCount(); i++) { String curr_state = rs_tab.getString(i, "state_name"); - states.add(curr_state); Integer cnt = state_counts.get(curr_state); if (cnt == null) { - state_counts.put(curr_state, new Integer(1)); + state_counts.put(curr_state, Integer.valueOf(1)); } else { state_counts.remove(curr_state); - state_counts.put(curr_state, new Integer(cnt.intValue()+1)); + state_counts.put(curr_state, Integer.valueOf(cnt.intValue()+1)); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java index 1aa50af..1f44b0a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java +++ b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java @@ -42,7 +42,7 @@ public class Aggregator { private String jdbc = null; private int[] intervals; private long current = 0; - private static DatabaseWriter db = null; + private DatabaseWriter db = null; public Aggregator() { Calendar now = Calendar.getInstance(); @@ -76,6 +76,8 @@ public class Aggregator { } catch(Exception e) { log.error("Query: "+query); throw new Exception("Aggregation failed for: "+query); + } finally { + db.close(); } } @@ -111,7 +113,6 @@ public class Aggregator { if (cluster == null) { cluster = "unknown"; } - db = new DatabaseWriter(cluster); String queries = Aggregator.getContents(new File(System .getenv("CHUKWA_CONF_DIR") + File.separator + "aggregator.sql")); @@ -122,6 +123,7 @@ public class Aggregator { log.debug("skipping: " + query[i]); } else if(!query[i].equals("")) { Aggregator dba = new Aggregator(); + dba.setWriter(new DatabaseWriter(cluster)); long start = Calendar.getInstance().getTimeInMillis(); try { if(startTime!=0 && endTime!=0) { @@ -142,7 +144,6 @@ public class Aggregator { } startTime = startTime + 5*60000; } - db.close(); long aggregatorEnd = Calendar.getInstance().getTimeInMillis(); log.info("Longest running query: " + longQuery + " (" + (double) longest / 1000 + " seconds)"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/database/DataExpiration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/database/DataExpiration.java b/src/main/java/org/apache/hadoop/chukwa/database/DataExpiration.java index 7e1ca77..fc10795 100644 --- a/src/main/java/org/apache/hadoop/chukwa/database/DataExpiration.java +++ b/src/main/java/org/apache/hadoop/chukwa/database/DataExpiration.java @@ -18,12 +18,14 @@ package org.apache.hadoop.chukwa.database; - +import java.sql.SQLException; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.chukwa.util.DatabaseWriter; @@ -39,6 +41,9 @@ public class DataExpiration { } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = + "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", + justification = "Dynamic based upon tables in the database") public void dropTables(long start, long end) { String cluster = System.getProperty("CLUSTER"); if (cluster == null) { @@ -47,10 +52,8 @@ public class DataExpiration { DatabaseWriter dbw = new DatabaseWriter(cluster); try { HashMap<String, String> dbNames = dbc.startWith("report.db.name."); - Iterator<String> ki = dbNames.keySet().iterator(); - while (ki.hasNext()) { - String name = ki.next(); - String tableName = dbNames.get(name); + for(Entry<String, String> entry : dbNames.entrySet()) { + String tableName = entry.getValue(); if (!RegexUtil.isRegex(tableName)) { log.warn("Skipping tableName: '" + tableName + "' because there was an error parsing it as a regex: " @@ -63,22 +66,24 @@ public class DataExpiration { try { String[] parts = tl.split("_"); int partition = Integer.parseInt(parts[parts.length - 2]); - String table = ""; + StringBuilder table = new StringBuilder(); for (int i = 0; i < parts.length - 2; i++) { if (i != 0) { - table = table + "_"; + table.append("_"); } - table = table + parts[i]; + table.append(parts[i]); } partition = partition - 3; - String dropPartition = "drop table if exists " + table + "_" - + partition + "_" + parts[parts.length - 1]; - dbw.execute(dropPartition); - partition--; if(partition>=0) { - dropPartition = "drop table if exists " + table + "_" + partition - + "_" + parts[parts.length - 1]; - dbw.execute(dropPartition); + StringBuilder dropPartition = new StringBuilder(); + dropPartition.append("drop table if exists "); + dropPartition.append(table); + dropPartition.append("_"); + dropPartition.append(partition); + dropPartition.append("_"); + dropPartition.append(parts[parts.length - 1]); + final String query = dropPartition.toString(); + dbw.execute(query); } } catch (NumberFormatException e) { log @@ -91,7 +96,7 @@ public class DataExpiration { } } dbw.close(); - } catch (Exception e) { + } catch (SQLException e) { e.printStackTrace(); } } @@ -118,7 +123,7 @@ public class DataExpiration { de.dropTables(start, end); long dataExpEnd = Calendar.getInstance().getTimeInMillis(); log.info("DataExpiration for: "+args[0]+" "+args[1]+" finished: ("+(double) (dataExpEnd-dataExpStart)/1000+" seconds)"); - } catch (Exception e) { + } catch (ParseException e) { usage(); } } else { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/database/Macro.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/database/Macro.java b/src/main/java/org/apache/hadoop/chukwa/database/Macro.java index 7c60dfc..f550c05 100644 --- a/src/main/java/org/apache/hadoop/chukwa/database/Macro.java +++ b/src/main/java/org/apache/hadoop/chukwa/database/Macro.java @@ -21,7 +21,7 @@ import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; -import java.util.Iterator; +import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,7 +38,7 @@ public class Macro { private long start = 0; private long end = 0; private static DatabaseConfig dbc = new DatabaseConfig(); - private static DatabaseWriter db = null; + private DatabaseWriter db = null; private String query = null; private HttpServletRequest request = null; @@ -272,30 +272,30 @@ public class Macro { } public String toString() { try { - HashMap<String, String> macroList = findMacros(query); - Iterator<String> macroKeys = macroList.keySet().iterator(); - while(macroKeys.hasNext()) { - String mkey = macroKeys.next(); - if(macroList.get(mkey).contains("|")) { - StringBuffer buf = new StringBuffer(); - String[] tableList = macroList.get(mkey).split("\\|"); - boolean first = true; - for(String table : tableList) { - String newQuery = query.replace("["+mkey+"]", table); - if(!first) { - buf.append(" union "); - } - buf.append("("); - buf.append(newQuery); - buf.append(")"); - first = false; + HashMap<String, String> macroList = findMacros(query); + for(Entry<String, String> entry : macroList.entrySet()) { + String mkey = entry.getKey(); + String value = entry.getValue(); + if(value.contains("|")) { + StringBuffer buf = new StringBuffer(); + String[] tableList = value.split("\\|"); + boolean first = true; + for(String table : tableList) { + String newQuery = query.replace("["+mkey+"]", table); + if(!first) { + buf.append(" union "); } - query = buf.toString(); + buf.append("("); + buf.append(newQuery); + buf.append(")"); + first = false; + } + query = buf.toString(); } else { - log.debug("replacing:"+mkey+" with "+macroList.get(mkey)); - query = query.replace("["+mkey+"]", macroList.get(mkey)); + log.debug("replacing:"+mkey+" with "+macroList.get(mkey)); + query = query.replace("["+mkey+"]", macroList.get(mkey)); } - } + } } catch(SQLException ex) { log.error(query); log.error(ex.getMessage()); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java b/src/main/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java index 2eaae38..9d8e8ba 100644 --- a/src/main/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java +++ b/src/main/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java @@ -41,6 +41,9 @@ public class MetricsAggregation { * @param args * @throws SQLException */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = + "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", + justification = "Dynamic based upon tables in the database") public static void main(String[] args) throws SQLException { mdlConfig = new DatabaseConfig(); @@ -147,7 +150,8 @@ public class MetricsAggregation { // run query conn.setAutoCommit(false); stmt = conn.createStatement(); - stmt.execute(sb0.toString()); + final String query = sb0.toString(); + stmt.execute(query); // update last run stmt = conn.createStatement(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/database/TableCreator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/database/TableCreator.java b/src/main/java/org/apache/hadoop/chukwa/database/TableCreator.java index 5d8c799..70eaf56 100644 --- a/src/main/java/org/apache/hadoop/chukwa/database/TableCreator.java +++ b/src/main/java/org/apache/hadoop/chukwa/database/TableCreator.java @@ -24,7 +24,8 @@ import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.chukwa.util.DatabaseWriter; @@ -46,6 +47,9 @@ public class TableCreator { createTables(now, now); } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = + "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", + justification = "Dynamic based upon tables in the database") public void createTables(long start, long end) throws Exception { String cluster = System.getProperty("CLUSTER"); if (cluster == null) { @@ -53,10 +57,8 @@ public class TableCreator { } DatabaseWriter dbw = new DatabaseWriter(cluster); HashMap<String, String> dbNames = dbc.startWith("report.db.name."); - Iterator<String> ki = dbNames.keySet().iterator(); - while (ki.hasNext()) { - String name = ki.next(); - String tableName = dbNames.get(name); + for(Entry<String, String> entry : dbNames.entrySet()) { + String tableName = entry.getValue(); if (!RegexUtil.isRegex(tableName)) { log.warn("Skipping tableName: '" + tableName + "' because there was an error parsing it as a regex: " @@ -68,39 +70,44 @@ public class TableCreator { try { String[] parts = tableList[0].split("_"); int partition = Integer.parseInt(parts[parts.length - 2]); - String table = ""; + StringBuilder tableNameBuffer = new StringBuilder(); for (int i = 0; i < parts.length - 2; i++) { if (i != 0) { - table = table + "_"; + tableNameBuffer.append("_"); } - table = table + parts[i]; + tableNameBuffer.append(parts[i]); } - String query = "show create table " + table + "_template;"; + String table = tableNameBuffer.toString(); + StringBuilder q = new StringBuilder(); + q.append("show create table "); + q.append(table); + q.append("_template;"); + final String query = q.toString(); ResultSet rs = dbw.query(query); while (rs.next()) { log.debug("table schema: " + rs.getString(2)); - query = rs.getString(2); + String tbl = rs.getString(2); log.debug("template table name:" + table + "_template"); log.debug("replacing with table name:" + table + "_" + partition + "_" + parts[parts.length - 1]); - log.debug("creating table: " + query); - String createPartition = query.replaceFirst(table + "_template", - table + "_" + partition + "_" + parts[parts.length - 1]); - createPartition = createPartition.replaceFirst("TABLE", - "TABLE IF NOT EXISTS"); - dbw.execute(createPartition); - partition++; - createPartition = query.replaceFirst(table + "_template", table - + "_" + partition + "_" + parts[parts.length - 1]); - createPartition = createPartition.replaceFirst("TABLE", - "TABLE IF NOT EXISTS"); - dbw.execute(createPartition); - partition++; - createPartition = query.replaceFirst(table + "_template", table - + "_" + partition + "_" + parts[parts.length - 1]); - createPartition = createPartition.replaceFirst("TABLE", - "TABLE IF NOT EXISTS"); - dbw.execute(createPartition); + log.debug("creating table: " + tbl); + + for(int i=0;i<2;i++) { + StringBuilder templateName = new StringBuilder(); + templateName.append(table); + templateName.append("_template"); + StringBuilder partitionName = new StringBuilder(); + partitionName.append(table); + partitionName.append("_"); + partitionName.append(partition); + partitionName.append("_"); + partitionName.append(parts[parts.length - 1]); + tbl = tbl.replaceFirst("TABLE", "TABLE IF NOT EXISTS"); + tbl = tbl.replaceFirst(templateName.toString(), partitionName.toString()); + final String createTable = tbl; + dbw.execute(createTable); + partition++; + } } } catch (NumberFormatException e) { log.error("Error in parsing table partition number, skipping table:" http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java index 957752b..f27b806 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java @@ -25,6 +25,7 @@ import java.lang.reflect.Constructor; import java.util.Iterator; import org.apache.hadoop.conf.*; +import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue; import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors; @@ -35,15 +36,11 @@ public class DataFactory { static final String COLLECTORS_FILENAME = "collectors"; static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue"; - private static DataFactory dataFactory = null; + protected static final DataFactory dataFactory = new DataFactory(); private ChunkQueue chunkQueue = null; private String defaultTags = ""; - static { - dataFactory = new DataFactory(); - } - private DataFactory() { } @@ -57,7 +54,11 @@ public class DataFactory { } return chunkQueue; } - + + public void put(Chunk c) throws InterruptedException { + chunkQueue.add(c); + } + public synchronized ChunkQueue createEventQueue() { Configuration conf = ChukwaAgent.getStaticConfiguration(); if(conf == null){ @@ -96,7 +97,11 @@ public class DataFactory { public String getDefaultTags() { return defaultTags; } - + + public void setDefaultTags(String tags) { + defaultTags = tags; + } + public void addDefaultTag(String tag) { this.defaultTags += " " + tag.trim(); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java index 249e247..497f738 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.LinkedList; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * Manages stats for multiple objects of type T. T can be any class that is used @@ -87,12 +88,10 @@ public class OffsetStatsManager<T> { public void addOffsetDataPoint(T key, long offset, long timestamp) { OffsetDataStats stats = null; - synchronized (offsetStatsMap) { if (offsetStatsMap.get(key) == null) offsetStatsMap.put(key, new OffsetDataStats()); stats = offsetStatsMap.get(key); - } stats.add(new OffsetData(offset, timestamp)); stats.prune(statsDataTTL); @@ -176,18 +175,14 @@ public class OffsetStatsManager<T> { * @param key key of stats to be removed */ public void remove(T key) { - synchronized (offsetStatsMap) { offsetStatsMap.remove(key); - } } /** * Remove all objectst that we're tracking stats for. */ public void clear() { - synchronized (offsetStatsMap) { offsetStatsMap.clear(); - } } /** @@ -195,9 +190,7 @@ public class OffsetStatsManager<T> { * @param key key that stats are to be returned for */ private OffsetDataStats get(T key) { - synchronized (offsetStatsMap) { return offsetStatsMap.get(key); - } } public class OffsetData { @@ -214,9 +207,10 @@ public class OffsetStatsManager<T> { public double averageRate(OffsetData previous) { if (previous == null) return -1; - - return new Double((offset - previous.getOffset())) / - new Double((timestamp - previous.getTimestamp())) * 1000L; + double elapseOffset = offset - previous.getOffset(); + double elapseTime = (timestamp - previous.getTimestamp()) / 1000d; + double rate = elapseOffset / elapseTime; + return rate; } public boolean olderThan(long timestamp) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java index 4d9b05d..d3546de 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.chukwa.datacollection.adaptor; -import java.util.*; import java.util.regex.*; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.datacollection.ChunkReceiver; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java index 1a0e2a3..57959fb 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java @@ -24,7 +24,6 @@ import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin; import org.apache.log4j.Logger; import org.apache.log4j.helpers.ISO8601DateFormat; import org.json.simple.JSONObject; -import org.json.simple.parser.ParseException; import java.nio.charset.Charset; import java.util.*; @@ -155,8 +154,6 @@ public class ExecAdaptor extends AbstractAdaptor { @Override public void start(long offset) throws AdaptorException { - if(FULL_PATHS && !(new java.io.File(cmd)).exists()) - throw new AdaptorException("Can't start ExecAdaptor. No command " + cmd); this.sendOffset = offset; this.exec = new EmbeddedExec(cmd); TimerTask execTimer = new RunToolTask();
