Updated Branches: refs/heads/master 2981d6aa8 -> 027eea291
[HELIX-356] add a tool for grep zk transaction/snapshot logs based on time,rb=16935 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/027eea29 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/027eea29 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/027eea29 Branch: refs/heads/master Commit: 027eea291e86335063ca3bfa923515c3511f88d9 Parents: 2981d6a Author: zzhang <[email protected]> Authored: Wed Jan 15 17:56:27 2014 -0800 Committer: zzhang <[email protected]> Committed: Wed Jan 15 17:56:27 2014 -0800 ---------------------------------------------------------------------- helix-core/pom.xml | 8 +- .../java/org/apache/helix/tools/ZkGrep.java | 641 +++++++++++++++++++ .../org/apache/helix/tools/ZkLogAnalyzer.java | 444 ------------- 3 files changed, 645 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/027eea29/helix-core/pom.xml ---------------------------------------------------------------------- diff --git a/helix-core/pom.xml b/helix-core/pom.xml index f30abac..3bd9812 100644 --- a/helix-core/pom.xml +++ b/helix-core/pom.xml @@ -201,10 +201,6 @@ under the License. <name>start-standalone-zookeeper</name> </program> <program> - <mainClass>org.apache.helix.tools.ZkLogAnalyzer</mainClass> - <name>zk-log-analyzer</name> - </program> - <program> <mainClass>org.apache.helix.tools.JmxDumper</mainClass> <name>JmxDumper</name> </program> @@ -216,6 +212,10 @@ under the License. <mainClass>org.apache.helix.tools.IntegrationTestUtil</mainClass> <name>test-util</name> </program> + <program> + <mainClass>org.apache.helix.tools.ZkGrep</mainClass> + <name>zkgrep</name> + </program> </programs> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/helix/blob/027eea29/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java new file mode 100644 index 0000000..3abf78f --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java @@ -0,0 +1,641 @@ +package org.apache.helix.tools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; + +/** + * utility for grep zk transaction/snapshot logs + * - to grep a pattern by t1 use: + * zkgrep --zkCfg zkCfg --by t1 --pattern patterns... + * - to grep a pattern between t1 and t2 use: + * zkgrep --zkCfg zkCfg --between t1 t2 --pattern patterns... + * for example, to find fail-over latency between t1 and t2, use: + * 1) zkgrep --zkCfg zkCfg --by t1 --pattern "/{cluster}/LIVEINSTNCES/" | grep {fail-node} + * 2) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "closeSession" | grep {fail-node session-id} + * 3) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "/{cluster}" | grep "CURRENTSTATES" | + * grep "setData" | tail -1 + * fail-over latency = timestamp difference between 2) and 3) + */ +public class ZkGrep { + private static Logger LOG = Logger.getLogger(ZkGrep.class); + + private static final String zkCfg = "zkCfg"; + private static final String pattern = "pattern"; + private static final String by = "by"; + private static final String between = "between"; + + public static final String log = "log"; + public static final String snapshot = "snapshot"; + + private static final String gzSuffix = ".gz"; + + @SuppressWarnings("static-access") + private static Options constructCommandLineOptions() { + Option zkCfgOption = + OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(zkCfg).withArgName("zoo.cfg") + .withDescription("provide zoo.cfg").create(); + + Option patternOption = + OptionBuilder.hasArgs().isRequired(true).withLongOpt(pattern) + .withArgName("grep-patterns...").withDescription("provide patterns (required)") + .create(); + + Option betweenOption = + OptionBuilder.hasArgs(2).isRequired(false).withLongOpt(between) + .withArgName("t1 t2 (timestamp in ms or yyMMdd_hhmmss_SSS)") + .withDescription("grep between t1 and t2").create(); + + Option byOption = + OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(by) + .withArgName("t (timestamp in ms or yyMMdd_hhmmss_SSS)").withDescription("grep by t") + .create(); + + OptionGroup group = new OptionGroup(); + group.setRequired(true); + group.addOption(betweenOption); + group.addOption(byOption); + + Options options = new Options(); + options.addOption(zkCfgOption); + options.addOption(patternOption); + options.addOptionGroup(group); + return options; + } + + /** + * get zk transaction log dir and zk snapshot log dir + * @param zkCfgFile + * @return String[0]: zk-transaction-log-dir, String[1]: zk-snapshot-dir + */ + static String[] getZkDataDirs(String zkCfgFile) { + String[] zkDirs = new String[2]; + + FileInputStream fis = null; + BufferedReader br = null; + try { + fis = new FileInputStream(zkCfgFile); + br = new BufferedReader(new InputStreamReader(fis)); + + String line; + while ((line = br.readLine()) != null) { + String key = "dataDir="; + if (line.startsWith(key)) { + zkDirs[1] = zkDirs[0] = line.substring(key.length()) + "/version-2"; + } + + key = "dataLogDir="; + if (line.startsWith(key)) { + zkDirs[0] = line.substring(key.length()) + "/version-2"; + } + } + } catch (Exception e) { + LOG.error("exception in read file: " + zkCfgFile, e); + } finally { + try { + if (br != null) { + br.close(); + } + + if (fis != null) { + fis.close(); + } + + } catch (Exception e) { + LOG.error("exception in closing file: " + zkCfgFile, e); + } + } + + return zkDirs; + } + + // debug + static void printFiles(File[] files) { + System.out.println("START print"); + for (int i = 0; i < files.length; i++) { + File file = files[i]; + System.out.println(file.getName() + ", " + file.lastModified()); + } + System.out.println("END print"); + } + + /** + * get files under dir in order of last modified time + * @param dir + * @param pattern + * @return + */ + static File[] getSortedFiles(String dirPath, final String pattern) { + File dir = new File(dirPath); + File[] files = dir.listFiles(new FileFilter() { + + @Override + public boolean accept(File file) { + return file.isFile() && (file.getName().indexOf(pattern) != -1); + } + }); + + Arrays.sort(files, new Comparator<File>() { + + @Override + public int compare(File o1, File o2) { + int sign = (int) Math.signum(o1.lastModified() - o2.lastModified()); + return sign; + } + + }); + return files; + } + + /** + * get value for an attribute in a parsed zk log; e.g. + * "time:1384984016778 session:0x14257d1d17e0004 cxid:0x5 zxid:0x46899 type:error err:-101" + * given "time" return "1384984016778" + * @param line + * @param attribute + * @return value + */ + static String getAttributeValue(String line, String attribute) { + if (line == null) { + return null; + } + + if (!attribute.endsWith(":")) { + attribute = attribute + ":"; + } + + String[] parts = line.split("\\s"); + if (parts != null && parts.length > 0) { + for (int i = 0; i < parts.length; i++) { + if (parts[i].startsWith(attribute)) { + String val = parts[i].substring(attribute.length()); + return val; + } + } + } + return null; + } + + static long getTimestamp(String line) { + String timestamp = getAttributeValue(line, "time"); + return Long.parseLong(timestamp); + } + + /** + * parse a time string either in timestamp form or "yyMMdd_hhmmss_SSS" form + * @param time + * @return timestamp or -1 on error + */ + static long parseTimeString(String time) { + try { + return Long.parseLong(time); + } catch (NumberFormatException e) { + try { + SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS"); + Date date = formatter.parse(time); + return date.getTime(); + } catch (java.text.ParseException ex) { + LOG.error("fail to parse time string: " + time, e); + } + } + return -1; + } + + public static void grepZkLog(File zkLog, long start, long end, String... patterns) { + FileInputStream fis = null; + BufferedReader br = null; + try { + fis = new FileInputStream(zkLog); + br = new BufferedReader(new InputStreamReader(fis)); + + String line; + while ((line = br.readLine()) != null) { + try { + long timestamp = getTimestamp(line); + if (timestamp > end) { + break; + } + + if (timestamp < start) { + continue; + } + + boolean match = true; + for (String pattern : patterns) { + if (line.indexOf(pattern) == -1) { + match = false; + break; + } + } + + if (match) { + System.out.println(line); + } + + } catch (NumberFormatException e) { + // ignore + } + } + } catch (Exception e) { + LOG.error("exception in grep zk-log: " + zkLog, e); + } finally { + try { + if (br != null) { + br.close(); + } + + if (fis != null) { + fis.close(); + } + + } catch (Exception e) { + LOG.error("exception in closing zk-log: " + zkLog, e); + } + } + } + + public static void grepZkLogDir(List<File> parsedZkLogs, long start, long end, String... patterns) { + for (File file : parsedZkLogs) { + grepZkLog(file, start, end, patterns); + + } + + } + + public static void grepZkSnapshot(File zkSnapshot, String... patterns) { + FileInputStream fis = null; + BufferedReader br = null; + try { + fis = new FileInputStream(zkSnapshot); + br = new BufferedReader(new InputStreamReader(fis)); + + String line; + while ((line = br.readLine()) != null) { + try { + boolean match = true; + for (String pattern : patterns) { + if (line.indexOf(pattern) == -1) { + match = false; + break; + } + } + + if (match) { + System.out.println(line); + } + + } catch (NumberFormatException e) { + // ignore + } + } + } catch (Exception e) { + LOG.error("exception in grep zk-snapshot: " + zkSnapshot, e); + } finally { + try { + if (br != null) { + br.close(); + } + + if (fis != null) { + fis.close(); + } + + } catch (Exception e) { + LOG.error("exception in closing zk-snapshot: " + zkSnapshot, e); + } + } + } + + /** + * guess zoo.cfg dir + * @return absolute path to zoo.cfg + */ + static String guessZkCfgDir() { + // TODO impl this + return null; + } + + public static void printUsage(Options cliOptions) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(1000); + helpFormatter.printHelp("java " + ZkGrep.class.getName(), cliOptions); + } + + /** + * parse zk-transaction-logs between start and end, if not already parsed + * @param zkLogDir + * @param start + * @param end + * @return list of parsed zklogs between start and end, in order of last modified timestamp + */ + static List<File> parseZkLogs(String zkLogDir, long start, long end) { + File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home"))); + File[] zkLogs = getSortedFiles(zkLogDir, log); + // printFiles(zkDataFiles); + List<File> parsedZkLogs = new ArrayList<File>(); + + boolean stop = false; + for (File zkLog : zkLogs) { + if (stop) { + break; + } + + if (zkLog.lastModified() < start) { + continue; + } + + if (zkLog.lastModified() > end) { + stop = true; + } + + try { + File parsedZkLog = new File(zkParsedDir, stripGzSuffix(zkLog.getName()) + ".parsed"); + if (!parsedZkLog.exists() || parsedZkLog.lastModified() <= zkLog.lastModified()) { + + if (zkLog.getName().endsWith(gzSuffix)) { + // copy and gunzip it + FileUtils.copyFileToDirectory(zkLog, zkParsedDir); + File zkLogGz = new File(zkParsedDir, zkLog.getName()); + File tmpZkLog = gunzip(zkLogGz); + + // parse gunzip file + ZKLogFormatter.main(new String[] { + log, tmpZkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath() + }); + + // delete it + zkLogGz.delete(); + tmpZkLog.delete(); + } else { + // parse it directly + ZKLogFormatter.main(new String[] { + log, zkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath() + }); + } + } + parsedZkLogs.add(parsedZkLog); + } catch (Exception e) { + LOG.error("fail to parse zkLog: " + zkLog, e); + } + } + + return parsedZkLogs; + } + + /** + * Strip off a .gz suffix if any + * @param filename + * @return + */ + static String stripGzSuffix(String filename) { + if (filename.endsWith(gzSuffix)) { + return filename.substring(0, filename.length() - gzSuffix.length()); + } + return filename; + } + + /** + * Gunzip a file + * @param zipFile + * @return + */ + static File gunzip(File zipFile) { + File outputFile = new File(stripGzSuffix(zipFile.getAbsolutePath())); + + byte[] buffer = new byte[1024]; + + try { + + GZIPInputStream gzis = new GZIPInputStream(new FileInputStream(zipFile)); + FileOutputStream out = new FileOutputStream(outputFile); + + int len; + while ((len = gzis.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + + gzis.close(); + out.close(); + + return outputFile; + } catch (IOException e) { + LOG.error("fail to gunzip file: " + zipFile, e); + } + + return null; + } + + /** + * parse the last zk-snapshots by by-time, if not already parsed + * @param zkSnapshotDir + * @param byTime + * @return File array which the first element is the last zk-snapshot by by-time and the second + * element is its parsed file + */ + static File[] parseZkSnapshot(String zkSnapshotDir, long byTime) { + File[] retFiles = new File[2]; + File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home"))); + File[] zkSnapshots = getSortedFiles(zkSnapshotDir, snapshot); + // printFiles(zkDataFiles); + File lastZkSnapshot = null; + for (int i = 0; i < zkSnapshots.length; i++) { + File zkSnapshot = zkSnapshots[i]; + if (zkSnapshot.lastModified() >= byTime) { + break; + } + lastZkSnapshot = zkSnapshot; + retFiles[0] = lastZkSnapshot; + } + + try { + File parsedZkSnapshot = + new File(zkParsedDir, stripGzSuffix(lastZkSnapshot.getName()) + ".parsed"); + if (!parsedZkSnapshot.exists() + || parsedZkSnapshot.lastModified() <= lastZkSnapshot.lastModified()) { + + if (lastZkSnapshot.getName().endsWith(gzSuffix)) { + // copy and gunzip it + FileUtils.copyFileToDirectory(lastZkSnapshot, zkParsedDir); + File lastZkSnapshotGz = new File(zkParsedDir, lastZkSnapshot.getName()); + File tmpLastZkSnapshot = gunzip(lastZkSnapshotGz); + + // parse gunzip file + ZKLogFormatter.main(new String[] { + snapshot, tmpLastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath() + }); + + // delete it + lastZkSnapshotGz.delete(); + tmpLastZkSnapshot.delete(); + } else { + // parse it directly + ZKLogFormatter.main(new String[] { + snapshot, lastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath() + }); + } + + } + retFiles[1] = parsedZkSnapshot; + return retFiles; + } catch (Exception e) { + LOG.error("fail to parse zkSnapshot: " + lastZkSnapshot, e); + } + + return null; + } + + public static void processCommandLineArgs(String[] cliArgs) { + CommandLineParser cliParser = new GnuParser(); + Options cliOptions = constructCommandLineOptions(); + CommandLine cmd = null; + + try { + cmd = cliParser.parse(cliOptions, cliArgs); + } catch (ParseException pe) { + System.err.println("CommandLineClient: failed to parse command-line options: " + pe); + printUsage(cliOptions); + System.exit(1); + } + + String zkCfgDirValue = null; + String zkCfgFile = null; + + if (cmd.hasOption(zkCfg)) { + zkCfgDirValue = cmd.getOptionValue(zkCfg); + } + + if (zkCfgDirValue == null) { + zkCfgDirValue = guessZkCfgDir(); + } + + if (zkCfgDirValue == null) { + LOG.error("couldn't figure out path to zkCfg file"); + System.exit(1); + } + + // get zoo.cfg path from cfg-dir + zkCfgFile = zkCfgDirValue; + if (!zkCfgFile.endsWith(".cfg")) { + // append with default zoo.cfg + zkCfgFile = zkCfgFile + "/zoo.cfg"; + } + + if (!new File(zkCfgFile).exists()) { + LOG.error("zoo.cfg file doen't exist: " + zkCfgFile); + System.exit(1); + } + + String[] patterns = cmd.getOptionValues(pattern); + + String[] zkDataDirs = getZkDataDirs(zkCfgFile); + + // parse zk data files + if (zkDataDirs == null || zkDataDirs[0] == null || zkDataDirs[1] == null) { + LOG.error("invalid zkCfgDir: " + zkCfgDirValue); + System.exit(1); + } + + File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home"))); + if (!zkParsedDir.exists()) { + LOG.info("creating zklog-parsed dir: " + zkParsedDir.getAbsolutePath()); + zkParsedDir.mkdir(); + } + + if (cmd.hasOption(between)) { + String[] timeStrings = cmd.getOptionValues(between); + + long startTime = parseTimeString(timeStrings[0]); + if (startTime == -1) { + LOG.error("invalid start time string: " + timeStrings[0] + + ", should be either timestamp or yyMMdd_hhmmss_SSS"); + System.exit(1); + } + + long endTime = parseTimeString(timeStrings[1]); + if (endTime == -1) { + LOG.error("invalid end time string: " + timeStrings[1] + + ", should be either timestamp or yyMMdd_hhmmss_SSS"); + System.exit(1); + } + + if (startTime > endTime) { + LOG.warn("empty window: " + startTime + " - " + endTime); + System.exit(1); + } + // zkDataDirs[0] is the transaction log dir + List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, endTime); + grepZkLogDir(parsedZkLogs, startTime, endTime, patterns); + + } else if (cmd.hasOption(by)) { + String timeString = cmd.getOptionValue(by); + + long byTime = parseTimeString(timeString); + if (byTime == -1) { + LOG.error("invalid by time string: " + timeString + + ", should be either timestamp or yyMMdd_hhmmss_SSS"); + System.exit(1); + } + + // zkDataDirs[1] is the snapshot dir + File[] lastZkSnapshot = parseZkSnapshot(zkDataDirs[1], byTime); + + // lastZkSnapshot[1] is the parsed last snapshot by byTime + grepZkSnapshot(lastZkSnapshot[1], patterns); + + // need to grep transaction logs between last-modified-time of snapshot and byTime also + // lastZkSnapshot[0] is the last snapshot by byTime + long startTime = lastZkSnapshot[0].lastModified(); + + // zkDataDirs[0] is the transaction log dir + List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, byTime); + grepZkLogDir(parsedZkLogs, startTime, byTime, patterns); + } + } + + public static void main(String[] args) { + processCommandLineArgs(args); + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/027eea29/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java deleted file mode 100644 index 8b32ddc..0000000 --- a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java +++ /dev/null @@ -1,444 +0,0 @@ -package org.apache.helix.tools; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.InputStreamReader; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Message; -import org.apache.helix.model.Message.MessageState; -import org.apache.log4j.Logger; - -public class ZkLogAnalyzer { - private static Logger LOG = Logger.getLogger(ZkLogAnalyzer.class); - private static boolean dump = false;; - final static ZNRecordSerializer _deserializer = new ZNRecordSerializer(); - - static class Stats { - int msgSentCount = 0; - int msgSentCount_O2S = 0; // Offline to Slave - int msgSentCount_S2M = 0; // Slave to Master - int msgSentCount_M2S = 0; // Master to Slave - int msgDeleteCount = 0; - int msgModifyCount = 0; - int curStateCreateCount = 0; - int curStateUpdateCount = 0; - int extViewCreateCount = 0; - int extViewUpdateCount = 0; - } - - static String getAttributeValue(String line, String attribute) { - if (line == null) - return null; - String[] parts = line.split("\\s"); - if (parts != null && parts.length > 0) { - for (int i = 0; i < parts.length; i++) { - if (parts[i].startsWith(attribute)) { - String val = parts[i].substring(attribute.length()); - return val; - } - } - } - return null; - } - - static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end) { - long lastCSUpdateTimestamp = Long.MIN_VALUE; - String lastCSUpdateLine = null; - for (String line : csUpdateLines) { - // ZNRecord record = getZNRecord(line); - long timestamp = Long.parseLong(getAttributeValue(line, "time:")); - if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp) { - lastCSUpdateTimestamp = timestamp; - lastCSUpdateLine = line; - } - } - assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end; - return lastCSUpdateLine; - } - - static ZNRecord getZNRecord(String line) { - ZNRecord record = null; - String value = getAttributeValue(line, "data:"); - if (value != null) { - record = (ZNRecord) _deserializer.deserialize(value.getBytes()); - // if (record == null) - // { - // System.out.println(line); - // } - } - return record; - } - - public static void main(String[] args) throws Exception { - if (args.length != 3) { - System.err - .println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)"); - System.exit(1); - } - - System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args)); - // get create-timestamp of "/" + clusterName - // find all zk logs after that create-timestamp and parse them - // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2... - - String zkLogDir = args[0]; - String clusterName = args[1]; - // String zkAddr = args[2]; - String startTimeStr = args[2]; - // ZkClient zkClient = new ZkClient(zkAddr); - // Stat clusterCreateStat = zkClient.getStat("/" + clusterName); - SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS"); - Date date = formatter.parse(startTimeStr); - long startTimeStamp = date.getTime(); - - System.out.println(clusterName + " created at " + date); - while (zkLogDir.endsWith("/")) { - zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1); - } - if (!zkLogDir.endsWith("/version-2")) { - zkLogDir = zkLogDir + "/version-2"; - } - File dir = new File(zkLogDir); - File[] zkLogs = dir.listFiles(new FileFilter() { - - @Override - public boolean accept(File file) { - return file.isFile() && (file.getName().indexOf("log") != -1); - } - }); - - // lastModified time -> zkLog - TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>(); - for (File file : zkLogs) { - if (file.lastModified() > startTimeStamp) { - lastZkLogs.put(file.lastModified(), file.getAbsolutePath()); - } - } - - List<String> parsedZkLogs = new ArrayList<String>(); - int i = 0; - System.out.println("zk logs last modified later than " + new Timestamp(startTimeStamp)); - for (Long lastModified : lastZkLogs.keySet()) { - String fileName = lastZkLogs.get(lastModified); - System.out.println(new Timestamp(lastModified) + ": " - + (fileName.substring(fileName.lastIndexOf('/') + 1))); - - String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i; - i++; - ZKLogFormatter.main(new String[] { - "log", fileName, parsedFileName - }); - parsedZkLogs.add(parsedFileName); - } - - // sessionId -> create liveInstance line - Map<String, String> sessionMap = new HashMap<String, String>(); - - // message send lines in time order - // List<String> sendMessageLines = new ArrayList<String>(); - - // CS update lines in time order - List<String> csUpdateLines = new ArrayList<String>(); - - String leaderSession = null; - - System.out.println(); - Stats stats = new Stats(); - long lastTestStartTimestamp = Long.MAX_VALUE; - long controllerStartTime = 0; - for (String parsedZkLog : parsedZkLogs) { - - FileInputStream fis = new FileInputStream(parsedZkLog); - BufferedReader br = new BufferedReader(new InputStreamReader(fis)); - - String inputLine; - while ((inputLine = br.readLine()) != null) { - String timestamp = getAttributeValue(inputLine, "time:"); - if (timestamp == null) { - continue; - } - long timestampVal = Long.parseLong(timestamp); - if (timestampVal < startTimeStamp) { - continue; - } - - if (dump == true) { - String printLine = inputLine.replaceAll("data:.*", ""); - printLine = - new Timestamp(timestampVal) + " " - + printLine.substring(printLine.indexOf("session:")); - System.err.println(printLine); - } - - if (inputLine.indexOf("/start_disable") != -1) { - dump = true; - } - if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1) { - String type = getAttributeValue(inputLine, "type:"); - if (type.equals("delete")) { - System.out.println(timestamp + ": verify done"); - System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp); - String lastCSUpdateLine = - findLastCSUpdateBetween(csUpdateLines, lastTestStartTimestamp, timestampVal); - long lastCSUpdateTimestamp = - Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:")); - System.out.println("Last CS Update:" + lastCSUpdateTimestamp); - - System.out.println("state transition latency: " - + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms"); - - System.out.println("state transition latency since controller start: " - + +(lastCSUpdateTimestamp - controllerStartTime) + "ms"); - - System.out.println("Create MSG\t" + stats.msgSentCount + "\t" + stats.msgSentCount_O2S - + "\t" + stats.msgSentCount_S2M + "\t" + stats.msgSentCount_M2S); - System.out.println("Modify MSG\t" + stats.msgModifyCount); - System.out.println("Delete MSG\t" + stats.msgDeleteCount); - System.out.println("Create CS\t" + stats.curStateCreateCount); - System.out.println("Update CS\t" + stats.curStateUpdateCount); - System.out.println("Create EV\t" + stats.extViewCreateCount); - System.out.println("Update EV\t" + stats.extViewUpdateCount); - - System.out.println(); - stats = new Stats(); - lastTestStartTimestamp = Long.MAX_VALUE; - } - } else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1) { - // cluster startup - if (timestampVal < lastTestStartTimestamp) { - System.out.println("START cluster. SETTING lastTestStartTimestamp to " - + new Timestamp(timestampVal) + "\nline:" + inputLine); - lastTestStartTimestamp = timestampVal; - } - - ZNRecord record = getZNRecord(inputLine); - LiveInstance liveInstance = new LiveInstance(record); - String session = getAttributeValue(inputLine, "session:"); - sessionMap.put(session, inputLine); - System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE " - + liveInstance.getInstanceName()); - } else if (inputLine.indexOf("closeSession") != -1) { - // kill any instance - String session = getAttributeValue(inputLine, "session:"); - if (sessionMap.containsKey(session)) { - if (timestampVal < lastTestStartTimestamp) { - System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal - + " line:" + inputLine); - lastTestStartTimestamp = timestampVal; - } - String line = sessionMap.get(session); - ZNRecord record = getZNRecord(line); - LiveInstance liveInstance = new LiveInstance(record); - - System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session " - + liveInstance.getInstanceName()); - dump = true; - } - } else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1) { - // disable a partition - String type = getAttributeValue(inputLine, "type:"); - if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1) { - if (timestampVal < lastTestStartTimestamp) { - System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to " - + timestampVal + " line:" + inputLine); - lastTestStartTimestamp = timestampVal; - } - } - } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1) { - // leaderLine = inputLine; - ZNRecord record = getZNRecord(inputLine); - LiveInstance liveInstance = new LiveInstance(record); - String session = getAttributeValue(inputLine, "session:"); - leaderSession = session; - controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:")); - sessionMap.put(session, inputLine); - System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER " - + liveInstance.getInstanceName()); - } else if (inputLine.indexOf("/" + clusterName + "/") != -1 - && inputLine.indexOf("/CURRENTSTATES/") != -1) { - String type = getAttributeValue(inputLine, "type:"); - if (type.equals("create")) { - stats.curStateCreateCount++; - } else if (type.equals("setData")) { - String path = getAttributeValue(inputLine, "path:"); - csUpdateLines.add(inputLine); - stats.curStateUpdateCount++; - // getAttributeValue(line, "data"); - System.out.println("Update currentstate:" + new Timestamp(Long.parseLong(timestamp)) - + ":" + timestamp + " path:" + path); - } - } else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1) { - String session = getAttributeValue(inputLine, "session:"); - if (session.equals(leaderSession)) { - String type = getAttributeValue(inputLine, "type:"); - if (type.equals("create")) { - stats.extViewCreateCount++; - } else if (type.equals("setData")) { - stats.extViewUpdateCount++; - } - } - - // pos = inputLine.indexOf("EXTERNALVIEW"); - // pos = inputLine.indexOf("data:{", pos); - // if (pos != -1) - // { - // String timestamp = getAttributeValue(inputLine, "time:"); - // ZNRecord record = - // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5) - // .getBytes()); - // ExternalView extView = new ExternalView(record); - // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView, - // "MASTER"); - // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE"); - // if (masterCnt == 1200) - // { - // System.out.println(timestamp + ": externalView " + extView.getResourceName() - // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE"); - // } - // } - } else if (inputLine.indexOf("/" + clusterName + "/") != -1 - && inputLine.indexOf("/MESSAGES/") != -1) { - String type = getAttributeValue(inputLine, "type:"); - - if (type.equals("create")) { - ZNRecord record = getZNRecord(inputLine); - Message msg = new Message(record); - String sendSession = getAttributeValue(inputLine, "session:"); - if (sendSession.equals(leaderSession) && msg.getMsgType().equals("STATE_TRANSITION") - && msg.getMsgState() == MessageState.NEW) { - // sendMessageLines.add(inputLine); - stats.msgSentCount++; - - if (msg.getTypedFromState().toString().equals("OFFLINE") - && msg.getTypedToState().toString().equals("SLAVE")) { - stats.msgSentCount_O2S++; - } else if (msg.getTypedFromState().toString().equals("SLAVE") - && msg.getTypedToState().toString().equals("MASTER")) { - stats.msgSentCount_S2M++; - } else if (msg.getTypedFromState().toString().equals("MASTER") - && msg.getTypedToState().toString().equals("SLAVE")) { - stats.msgSentCount_M2S++; - } - // System.out.println("Message create:"+new - // Timestamp(Long.parseLong(timestamp))); - } - - // pos = inputLine.indexOf("MESSAGES"); - // pos = inputLine.indexOf("data:{", pos); - // if (pos != -1) - // { - // - // byte[] msgBytes = inputLine.substring(pos + 5).getBytes(); - // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes); - // Message msg = new Message(record); - // MessageState msgState = msg.getMsgState(); - // String msgType = msg.getMsgType(); - // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW) - // { - // if (!msgs.containsKey(msg.getMsgId())) - // { - // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg)); - // } - // else - // { - // LOG.error("msg: " + msg.getMsgId() + " already sent"); - // } - // - // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "(" - // + msg.getFromState() + "->" + msg.getToState() + ") to " - // + msg.getTgtName() + ", size: " + msgBytes.length); - // } - // } - } else if (type.equals("setData")) { - stats.msgModifyCount++; - // pos = inputLine.indexOf("MESSAGES"); - // pos = inputLine.indexOf("data:{", pos); - // if (pos != -1) - // { - // - // byte[] msgBytes = inputLine.substring(pos + 5).getBytes(); - // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes); - // Message msg = new Message(record); - // MessageState msgState = msg.getMsgState(); - // String msgType = msg.getMsgType(); - // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ) - // { - // if (!msgs.containsKey(msg.getMsgId())) - // { - // LOG.error("msg: " + msg.getMsgId() + " never sent"); - // } - // else - // { - // MsgItem msgItem = msgs.get(msg.getMsgId()); - // if (msgItem.readTime == 0) - // { - // msgItem.readTime = Long.parseLong(timestamp); - // msgs.put(msg.getMsgId(), msgItem); - // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName() - // // + "(" - // // + msg.getFromState() + "->" + msg.getToState() + ") to " - // // + msg.getTgtName() + ", latency: " + (msgItem.readTime - - // // msgItem.sendTime)); - // } - // } - // - // } - // } - } else if (type.equals("delete")) { - stats.msgDeleteCount++; - // String msgId = path.substring(path.lastIndexOf('/') + 1); - // if (msgs.containsKey(msgId)) - // { - // MsgItem msgItem = msgs.get(msgId); - // Message msg = msgItem.msg; - // msgItem.deleteTime = Long.parseLong(timestamp); - // msgs.put(msgId, msgItem); - // msgItem.latency = msgItem.deleteTime - msgItem.sendTime; - // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "(" - // + msg.getFromState() + "->" + msg.getToState() + ") to " - // + msg.getTgtName() + ", latency: " + msgItem.latency); - // } - // else - // { - // // messages other than STATE_TRANSITION message - // // LOG.error("msg: " + msgId + " never sent"); - // } - } - } - } // end of [br.readLine()) != null] - } - } -}
