http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java index 8075f4d..f828ff1 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java +++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; @@ -71,19 +72,20 @@ public class ChukwaHBaseStore { static double RESOLUTION = 360; static int MINUTE = 60000; //60 milliseconds final static int SECOND = (int) TimeUnit.SECONDS.toMillis(1); - - static byte[] COLUMN_FAMILY = "t".getBytes(); - static byte[] ANNOTATION_FAMILY = "a".getBytes(); - static byte[] KEY_NAMES = "k".getBytes(); - static byte[] CHART_TYPE = "chart_meta".getBytes(); - static byte[] CHART_FAMILY = "c".getBytes(); - static byte[] COMMON_FAMILY = "c".getBytes(); - static byte[] WIDGET_TYPE = "widget_meta".getBytes(); - static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes(); + private final static Charset UTF8 = Charset.forName("UTF-8"); + + final static byte[] COLUMN_FAMILY = "t".getBytes(UTF8); + final static byte[] ANNOTATION_FAMILY = "a".getBytes(UTF8); + final static byte[] KEY_NAMES = "k".getBytes(UTF8); + final static byte[] CHART_TYPE = "chart_meta".getBytes(UTF8); + final static byte[] CHART_FAMILY = "c".getBytes(UTF8); + final static byte[] COMMON_FAMILY = "c".getBytes(UTF8); + final static byte[] WIDGET_TYPE = "widget_meta".getBytes(UTF8); + final static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes(UTF8); private static final String CHUKWA = "chukwa"; private static final String CHUKWA_META = "chukwa_meta"; private static long MILLISECONDS_IN_DAY = 86400000L; - protected static Connection connection = null; + private static Connection connection = null; public ChukwaHBaseStore() { super(); @@ -171,7 +173,7 @@ public class ChukwaHBaseStore { byte[] key = CellUtil.cloneQualifier(kv); long timestamp = ByteBuffer.wrap(key).getLong(); double value = Double - .parseDouble(new String(CellUtil.cloneValue(kv), "UTF-8")); + .parseDouble(new String(CellUtil.cloneValue(kv), UTF8)); series.add(timestamp, value); } } @@ -179,7 +181,7 @@ public class ChukwaHBaseStore { currentDay = currentDay + (i * MILLISECONDS_IN_DAY); } table.close(); - } catch (Exception e) { + } catch (IOException e) { closeHBase(); LOG.error(ExceptionUtil.getStackTrace(e)); } @@ -191,12 +193,12 @@ public class ChukwaHBaseStore { try { getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); - Get get = new Get(metricGroup.getBytes()); + Get get = new Get(metricGroup.getBytes(UTF8)); Result result = table.get(get); for (Cell kv : result.rawCells()) { - JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), "UTF-8")); + JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), UTF8)); if (json.get("type").equals("metric")) { - familyNames.add(new String(CellUtil.cloneQualifier(kv), "UTF-8")); + familyNames.add(new String(CellUtil.cloneQualifier(kv), UTF8)); } } table.close(); @@ -219,7 +221,7 @@ public class ChukwaHBaseStore { Iterator<Result> it = rs.iterator(); while (it.hasNext()) { Result result = it.next(); - metricGroups.add(new String(result.getRow(), "UTF-8")); + metricGroups.add(new String(result.getRow(), UTF8)); } table.close(); } catch (Exception e) { @@ -241,9 +243,9 @@ public class ChukwaHBaseStore { while (it.hasNext()) { Result result = it.next(); for (Cell cell : result.rawCells()) { - JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8")); + JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8)); if (json!=null && json.get("type")!=null && json.get("type").equals("source")) { - pk.add(new String(CellUtil.cloneQualifier(cell), "UTF-8")); + pk.add(new String(CellUtil.cloneQualifier(cell), UTF8)); } } } @@ -296,7 +298,7 @@ public class ChukwaHBaseStore { for(Cell cell : result.rawCells()) { byte[] dest = new byte[5]; System.arraycopy(CellUtil.cloneRow(cell), 3, dest, 0, 5); - String source = new String(dest); + String source = new String(dest, UTF8); long time = cell.getTimestamp(); // Time display in x axis long delta = time - startTime; @@ -306,11 +308,11 @@ public class ChukwaHBaseStore { if (keyMap.containsKey(source)) { y = keyMap.get(source); } else { - keyMap.put(source, new Integer(index)); + keyMap.put(source, Integer.valueOf(index)); y = index; index++; } - double v = Double.parseDouble(new String(CellUtil.cloneValue(cell))); + double v = Double.parseDouble(new String(CellUtil.cloneValue(cell), UTF8)); heatmap.put(x, y, v); if (v > max) { max = v; @@ -355,9 +357,9 @@ public class ChukwaHBaseStore { while (it.hasNext()) { Result result = it.next(); for (Cell cell : result.rawCells()) { - JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8")); + JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8)); if (json.get("type").equals("cluster")) { - clusters.add(new String(CellUtil.cloneQualifier(cell), "UTF-8")); + clusters.add(new String(CellUtil.cloneQualifier(cell), UTF8)); } } } @@ -382,10 +384,10 @@ public class ChukwaHBaseStore { Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Get get = new Get(CHART_TYPE); Result r = table.get(get); - byte[] value = r.getValue(CHART_FAMILY, id.getBytes()); + byte[] value = r.getValue(CHART_FAMILY, id.getBytes(UTF8)); Gson gson = new Gson(); if(value!=null) { - chart = gson.fromJson(new String(value), Chart.class); + chart = gson.fromJson(new String(value, UTF8), Chart.class); } table.close(); } catch (Exception e) { @@ -408,7 +410,7 @@ public class ChukwaHBaseStore { Put put = new Put(CHART_TYPE); Gson gson = new Gson(); String buffer = gson.toJson(chart); - put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes()); + put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8)); table.put(put); table.close(); } catch (Exception e) { @@ -437,7 +439,7 @@ public class ChukwaHBaseStore { s.setLineOptions(l); series.add(s); } - chart.SetSeries(series); + chart.setSeries(series); return createChart(chart); } @@ -469,7 +471,7 @@ public class ChukwaHBaseStore { Put put = new Put(CHART_TYPE); Gson gson = new Gson(); String buffer = gson.toJson(chart); - put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes()); + put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8)); table.put(put); table.close(); } catch (Exception e) { @@ -499,8 +501,8 @@ public class ChukwaHBaseStore { } // Figure out the time range and determine the best resolution // to fetch the data - long range = Math.round((endTime - startTime) - / (MINUTES_IN_HOUR * MINUTE)); + long range = (endTime - startTime) + / (long) (MINUTES_IN_HOUR * MINUTE); long sampleRate = 1; if (range <= 1) { sampleRate = 5; @@ -512,7 +514,7 @@ public class ChukwaHBaseStore { sampleRate = 87600; } double smoothing = (endTime - startTime) - / (sampleRate * SECOND ) / RESOLUTION; + / (double) (sampleRate * SECOND ) / (double) RESOLUTION; getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA)); @@ -550,7 +552,7 @@ public class ChukwaHBaseStore { byte[] key = CellUtil.cloneQualifier(kv); long timestamp = ByteBuffer.wrap(key).getLong(); double value = Double.parseDouble(new String(CellUtil.cloneValue(kv), - "UTF-8")); + UTF8)); if(initial==0) { filteredValue = value; } @@ -558,7 +560,7 @@ public class ChukwaHBaseStore { lastTime = timestamp; // Determine if there is any gap, if there is gap in data, reset // calculation. - if (elapsedTime > sampleRate) { + if (elapsedTime > (sampleRate * 5)) { filteredValue = 0.0d; } else { if (smoothing != 0.0d) { @@ -587,7 +589,7 @@ public class ChukwaHBaseStore { list.add(clone); } table.close(); - } catch (Exception e) { + } catch (IOException|CloneNotSupportedException e) { closeHBase(); LOG.error(ExceptionUtil.getStackTrace(e)); } @@ -622,7 +624,7 @@ public class ChukwaHBaseStore { continue; } Gson gson = new Gson(); - Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class); + Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class); list.add(widget); c++; } @@ -658,7 +660,7 @@ public class ChukwaHBaseStore { Result result = it.next(); for(Cell kv : result.rawCells()) { Gson gson = new Gson(); - Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class); + Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class); list.add(widget); } } @@ -683,11 +685,11 @@ public class ChukwaHBaseStore { getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Get widget = new Get(WIDGET_TYPE); - widget.addColumn(COMMON_FAMILY, title.getBytes()); + widget.addColumn(COMMON_FAMILY, title.getBytes(UTF8)); Result rs = table.get(widget); - byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes()); + byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes(UTF8)); Gson gson = new Gson(); - w = gson.fromJson(new String(buffer), Widget.class); + w = gson.fromJson(new String(buffer, UTF8), Widget.class); table.close(); } catch (Exception e) { closeHBase(); @@ -708,7 +710,7 @@ public class ChukwaHBaseStore { getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Get widgetTest = new Get(WIDGET_TYPE); - widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes()); + widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8)); if (table.exists(widgetTest)) { LOG.warn("Widget: " + widget.getTitle() + " already exists."); created = false; @@ -716,7 +718,7 @@ public class ChukwaHBaseStore { Put put = new Put(WIDGET_TYPE); Gson gson = new Gson(); String buffer = gson.toJson(widget); - put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(), buffer.getBytes()); + put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8), buffer.getBytes(UTF8)); table.put(put); created = true; } @@ -741,12 +743,12 @@ public class ChukwaHBaseStore { getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Delete oldWidget = new Delete(WIDGET_TYPE); - oldWidget.addColumn(COMMON_FAMILY, title.getBytes()); + oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8)); table.delete(oldWidget); Put put = new Put(WIDGET_TYPE); Gson gson = new Gson(); String buffer = gson.toJson(widget); - put.addColumn(COMMON_FAMILY, title.getBytes(), buffer.getBytes()); + put.addColumn(COMMON_FAMILY, title.getBytes(UTF8), buffer.getBytes(UTF8)); table.put(put); table.close(); result = true; @@ -772,7 +774,7 @@ public class ChukwaHBaseStore { getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Delete oldWidget = new Delete(WIDGET_TYPE); - oldWidget.addColumn(COMMON_FAMILY, title.getBytes()); + oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8)); table.delete(oldWidget); table.close(); result = true; @@ -790,7 +792,7 @@ public class ChukwaHBaseStore { getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Get dashboardTest = new Get(DASHBOARD_TYPE); - dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes()); + dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes(UTF8)); exists = table.exists(dashboardTest); table.close(); } catch (Exception e) { @@ -931,19 +933,19 @@ public class ChukwaHBaseStore { getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Get dashboard = new Get(DASHBOARD_TYPE); - dashboard.addColumn(COMMON_FAMILY, key.getBytes()); + dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8)); Result rs = table.get(dashboard); - byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes()); + byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8)); if(buffer == null) { // If user dashboard is not found, use default dashboard. key = new StringBuilder().append(id).append("|").toString(); dashboard = new Get(DASHBOARD_TYPE); - dashboard.addColumn(COMMON_FAMILY, key.getBytes()); + dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8)); rs = table.get(dashboard); - buffer = rs.getValue(COMMON_FAMILY, key.getBytes()); + buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8)); } Gson gson = new Gson(); - dash = gson.fromJson(new String(buffer), Dashboard.class); + dash = gson.fromJson(new String(buffer, UTF8), Dashboard.class); table.close(); } catch (Exception e) { closeHBase(); @@ -964,7 +966,7 @@ public class ChukwaHBaseStore { Put put = new Put(DASHBOARD_TYPE); Gson gson = new Gson(); String buffer = gson.toJson(dash); - put.addColumn(COMMON_FAMILY, key.getBytes(), buffer.getBytes()); + put.addColumn(COMMON_FAMILY, key.getBytes(UTF8), buffer.getBytes(UTF8)); table.put(put); table.close(); result = true;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java index eb79cd7..4f5f289 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.chukwa.extraction.hbase; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -37,7 +38,7 @@ public abstract class AbstractProcessor { protected String sourceHelper; protected byte[] key = null; - byte[] CF = "t".getBytes(); + byte[] CF = "t".getBytes(Charset.forName("UTF-8")); boolean chunkInErrorSaved = false; ArrayList<Put> output = null; @@ -70,14 +71,14 @@ public abstract class AbstractProcessor { byte[] key = HBaseUtil.buildKey(time, primaryKey, source); Put put = new Put(key); byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); - put.add(CF, timeInBytes, time, value); + put.addColumn(CF, timeInBytes, time, value); output.add(put); reporter.putMetric(chunk.getDataType(), primaryKey); reporter.putSource(chunk.getDataType(), source); } public void addRecord(String primaryKey, String value) { - addRecord(primaryKey, value.getBytes()); + addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8"))); } /** @@ -96,7 +97,7 @@ public abstract class AbstractProcessor { byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper); Put put = new Put(key); byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); - put.add(CF, timeInBytes, time, value); + put.addColumn(CF, timeInBytes, time, value); output.add(put); reporter.putMetric(chunk.getDataType(), primaryKey); } @@ -126,7 +127,7 @@ public abstract class AbstractProcessor { Put put = new Put(key); String family = "a"; byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); - put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes()); + put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8"))); output.add(put); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java index 2da64a3..483ac71 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java @@ -17,34 +17,46 @@ */ package org.apache.hadoop.chukwa.extraction.hbase; +import java.lang.reflect.Type; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.chukwa.util.HBaseUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.log4j.Logger; -import org.json.simple.JSONObject; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; public class DefaultProcessor extends AbstractProcessor { - + public DefaultProcessor() throws NoSuchAlgorithmException { - super(); - // TODO Auto-generated constructor stub + super(); + // TODO Auto-generated constructor stub } -static Logger LOG = Logger.getLogger(DefaultProcessor.class); + static Logger LOG = Logger.getLogger(DefaultProcessor.class); @Override protected void parse(byte[] recordEntry) throws Throwable { - byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource()); - Put put = new Put(key); - byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); - put.add("t".getBytes(), timeInBytes, chunk.getData()); - output.add(put); - JSONObject json = new JSONObject(); - json.put("sig", key); - json.put("type", "unknown"); - reporter.put(chunk.getDataType(), chunk.getSource(), json.toString()); + byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), + chunk.getSource()); + Put put = new Put(key); + byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); + put.addColumn("t".getBytes(Charset.forName("UTF-8")), timeInBytes, + chunk.getData()); + output.add(put); + Type defaultType = new TypeToken<Map<String, String>>() { + }.getType(); + Gson gson = new Gson(); + Map<String, String> meta = new HashMap<String, String>(); + meta.put("sig", new String(key, Charset.forName("UTF-8"))); + meta.put("type", "unknown"); + String buffer = gson.toJson(meta, defaultType); + reporter.put(chunk.getDataType(), chunk.getSource(), buffer); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java index 19df607..de64a0d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java @@ -18,13 +18,13 @@ package org.apache.hadoop.chukwa.extraction.hbase; - -import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; -import org.apache.hadoop.chukwa.util.HBaseUtil; -import org.apache.hadoop.hbase.client.Put; import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -37,15 +37,14 @@ public class HadoopMetricsProcessor extends AbstractProcessor { static final String recordNameField = "recordName"; static final String hostName = "Hostname"; static final String processName = "ProcessName"; - static final byte[] cf = "t".getBytes(); + static final byte[] cf = "t".getBytes(Charset.forName("UTF-8")); public HadoopMetricsProcessor() throws NoSuchAlgorithmException { } @Override protected void parse(byte[] recordEntry) throws Throwable { - try { - String body = new String(recordEntry); + String body = new String(recordEntry, Charset.forName("UTF-8")); int start = body.indexOf('{'); JSONObject json = (JSONObject) JSONValue.parse(body.substring(start)); @@ -56,10 +55,8 @@ public class HadoopMetricsProcessor extends AbstractProcessor { if(json.get(processName)!=null) { src = new StringBuilder(src).append(":").append(json.get(processName)).toString(); } - @SuppressWarnings("unchecked") - Iterator<String> ki = json.keySet().iterator(); - while (ki.hasNext()) { - String keyName = ki.next(); + for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) { + String keyName = entry.getKey(); if (timestampField.intern() == keyName.intern()) { continue; } else if (contextNameField.intern() == keyName.intern()) { @@ -71,20 +68,14 @@ public class HadoopMetricsProcessor extends AbstractProcessor { } else if (processName.intern() == keyName.intern()) { continue; } else { - if (json.get(keyName) != null) { - String v = json.get(keyName).toString(); + if(json.get(keyName)!=null) { + String v = entry.getValue().toString(); String primaryKey = new StringBuilder(contextName).append(".") .append(recordName).append(".").append(keyName).toString(); - addRecord(time, primaryKey, src, v.getBytes(), output); + addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output); } } } - - } catch (Exception e) { - LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]", - e); - throw e; - } } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java index dcbe2d4..0682c71 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java @@ -22,43 +22,43 @@ import java.text.SimpleDateFormat; import java.util.Date; public class LogEntry { - private final static SimpleDateFormat sdf = new SimpleDateFormat( - "yyyy-MM-dd HH:mm"); + private SimpleDateFormat sdf = new SimpleDateFormat( + "yyyy-MM-dd HH:mm"); - private Date date; - private String logLevel; - private String className; - private String body; + private Date date; + private String logLevel; + private String className; + private String body; - public LogEntry(String recordEntry) throws ParseException { - String dStr = recordEntry.substring(0, 23); - date = sdf.parse(dStr); - int start = 24; - int idx = recordEntry.indexOf(' ', start); - logLevel = recordEntry.substring(start, idx); - start = idx + 1; - idx = recordEntry.indexOf(' ', start); - className = recordEntry.substring(start, idx - 1); - body = recordEntry.substring(idx + 1); - } + public LogEntry(String recordEntry) throws ParseException { + String dStr = recordEntry.substring(0, 23); + date = sdf.parse(dStr); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + logLevel = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + className = recordEntry.substring(start, idx - 1); + body = recordEntry.substring(idx + 1); + } - public Date getDate() { - return date; - } + public Date getDate() { + return (Date) date.clone(); + } - public void setDate(Date date) { - this.date = date; - } + public void setDate(Date date) { + this.date = (Date) date.clone(); + } - public String getLogLevel() { - return logLevel; - } + public String getLogLevel() { + return logLevel; + } - public String getClassName() { - return className; - } + public String getClassName() { + return className; + } - public String getBody() { - return body; - } + public String getBody() { + return body; + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java index c2695f2..3718fbd 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java @@ -22,20 +22,14 @@ */ package org.apache.hadoop.chukwa.extraction.hbase; +import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Calendar; import java.util.Iterator; -import java.util.TimeZone; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; -import org.apache.hadoop.chukwa.Chunk; -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -48,7 +42,7 @@ public class SystemMetrics extends AbstractProcessor { @Override protected void parse(byte[] recordEntry) throws Throwable { - String buffer = new String(recordEntry); + String buffer = new String(recordEntry, Charset.forName("UTF-8")); JSONObject json = (JSONObject) JSONValue.parse(buffer); time = ((Long) json.get("timestamp")).longValue(); ChukwaRecord record = new ChukwaRecord(); @@ -70,11 +64,9 @@ public class SystemMetrics extends AbstractProcessor { user = user + Double.parseDouble(cpu.get("user").toString()); sys = sys + Double.parseDouble(cpu.get("sys").toString()); idle = idle + Double.parseDouble(cpu.get("idle").toString()); - @SuppressWarnings("unchecked") - Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator(); - while(iterator.hasNext()) { - String key = iterator.next(); - addRecord("cpu." + key + "." + i, cpu.get(key).toString()); + for(Entry<String, Object> entry : (Set<Map.Entry>) cpu.entrySet()) { + String key = entry.getKey(); + addRecord("cpu." + key + "." + i, String.valueOf(entry.getValue())); } } combined = combined / actualSize; @@ -94,20 +86,15 @@ public class SystemMetrics extends AbstractProcessor { record = new ChukwaRecord(); JSONObject memory = (JSONObject) json.get("memory"); - @SuppressWarnings("unchecked") - Iterator<String> memKeys = memory.keySet().iterator(); - while (memKeys.hasNext()) { - String key = memKeys.next(); - addRecord("memory." + key, memory.get(key).toString()); + for(Entry<String, Object> entry : (Set<Map.Entry>) memory.entrySet()) { + String key = entry.getKey(); + addRecord("memory." + key, String.valueOf(entry.getValue())); } record = new ChukwaRecord(); JSONObject swap = (JSONObject) json.get("swap"); - @SuppressWarnings("unchecked") - Iterator<String> swapKeys = swap.keySet().iterator(); - while (swapKeys.hasNext()) { - String key = swapKeys.next(); - addRecord("swap." + key, swap.get(key).toString()); + for(Map.Entry<String, Object> entry : (Set<Map.Entry>) swap.entrySet()) { + addRecord("swap." + entry.getKey(), String.valueOf(entry.getValue())); } double rxBytes = 0; @@ -122,28 +109,30 @@ public class SystemMetrics extends AbstractProcessor { JSONArray netList = (JSONArray) json.get("network"); for (int i = 0; i < netList.size(); i++) { JSONObject netIf = (JSONObject) netList.get(i); - @SuppressWarnings("unchecked") - Iterator<String> keys = netIf.keySet().iterator(); - while (keys.hasNext()) { - String key = keys.next(); - record.add(key + "." + i, netIf.get(key).toString()); + for(Map.Entry<String, Object> entry : (Set<Map.Entry>) netIf.entrySet()) { + String key = entry.getKey(); + long value = 0; + if(entry.getValue() instanceof Long) { + value = (Long) entry.getValue(); + } + record.add(key + "." + i, String.valueOf(entry.getValue())); if (i != 0) { if (key.equals("RxBytes")) { - rxBytes = rxBytes + (Long) netIf.get(key); + rxBytes = rxBytes + value; } else if (key.equals("RxDropped")) { - rxDropped = rxDropped + (Long) netIf.get(key); + rxDropped = rxDropped + value; } else if (key.equals("RxErrors")) { - rxErrors = rxErrors + (Long) netIf.get(key); + rxErrors = rxErrors + value; } else if (key.equals("RxPackets")) { - rxPackets = rxPackets + (Long) netIf.get(key); + rxPackets = rxPackets + value; } else if (key.equals("TxBytes")) { - txBytes = txBytes + (Long) netIf.get(key); + txBytes = txBytes + value; } else if (key.equals("TxCollisions")) { - txCollisions = txCollisions + (Long) netIf.get(key); + txCollisions = txCollisions + value; } else if (key.equals("TxErrors")) { - txErrors = txErrors + (Long) netIf.get(key); + txErrors = txErrors + value; } else if (key.equals("TxPackets")) { - txPackets = txPackets + (Long) netIf.get(key); + txPackets = txPackets + value; } } } @@ -168,22 +157,25 @@ public class SystemMetrics extends AbstractProcessor { JSONArray diskList = (JSONArray) json.get("disk"); for (int i = 0; i < diskList.size(); i++) { JSONObject disk = (JSONObject) diskList.get(i); - Iterator<String> keys = disk.keySet().iterator(); - while (keys.hasNext()) { - String key = keys.next(); - record.add(key + "." + i, disk.get(key).toString()); + for(Entry<String, Object> entry : (Set<Map.Entry>) disk.entrySet()) { + String key = entry.getKey(); + long value = 0; + if(entry.getValue() instanceof Long) { + value = (Long) entry.getValue(); + } + record.add(key + "." + i, String.valueOf(entry.getValue())); if (key.equals("ReadBytes")) { - readBytes = readBytes + (Long) disk.get("ReadBytes"); + readBytes = readBytes + value; } else if (key.equals("Reads")) { - reads = reads + (Long) disk.get("Reads"); + reads = reads + Long.valueOf(value);; } else if (key.equals("WriteBytes")) { - writeBytes = writeBytes + (Long) disk.get("WriteBytes"); + writeBytes = writeBytes + value; } else if (key.equals("Writes")) { - writes = writes + (Long) disk.get("Writes"); + writes = writes + value; } else if (key.equals("Total")) { - total = total + (Long) disk.get("Total"); + total = total + value; } else if (key.equals("Used")) { - used = used + (Long) disk.get("Used"); + used = used + value; } } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java index 9c21bf1..02fe3b7 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java @@ -20,21 +20,20 @@ package org.apache.hadoop.chukwa.hicc; import java.io.*; +import java.nio.charset.Charset; import java.util.*; import org.apache.hadoop.chukwa.datastore.ChukwaHBaseStore; public class ClusterConfig { - private static Set<String> clusterMap = null; + private Set<String> clusterMap = null; static public String getContents(File aFile) { // ...checks on aFile are elided StringBuffer contents = new StringBuffer(); try { - // use buffering, reading one line at a time - // FileReader always assumes default encoding is OK! - BufferedReader input = new BufferedReader(new FileReader(aFile)); + BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8"))); try { String line = null; // not declared within while loop /* http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java index 2d84c09..fe90941 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.net.URL; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -124,12 +125,12 @@ public class HiccWebServer { StringBuilder sb = new StringBuilder(); String line = null; try { - BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8"))); while ((line = reader.readLine()) != null) { sb.append(line + "\n"); } FSDataOutputStream out = fs.create(dest); - out.write(sb.toString().getBytes()); + out.write(sb.toString().getBytes(Charset.forName("UTF-8"))); out.close(); reader.close(); } catch(IOException e) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java index 7c8c6a7..19cde5f 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java @@ -111,7 +111,7 @@ public class Chart { return this.id; } - public void SetSeries(List<SeriesMetaData> series) { + public void setSeries(List<SeriesMetaData> series) { this.series = series; } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java index 61dc0b5..74d5e89 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java @@ -89,11 +89,11 @@ public class Widget { } public String[] getTokens() { - return tokens; + return tokens.clone(); } public void setTokens(String[] tokens) { - this.tokens = tokens; + this.tokens = tokens.clone(); } public void tokenize() { http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java index c9413bd..869efa4 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java @@ -24,6 +24,7 @@ import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; +import java.nio.charset.Charset; import java.util.Map; import javax.servlet.ServletException; @@ -44,11 +45,11 @@ public class HttpProxy extends HttpServlet { private final String USER_AGENT = "Mozilla/5.0"; private final static String SOLR_URL = "chukwa.solr.url"; private final static Logger LOG = Logger.getLogger(HttpProxy.class); - private ChukwaConfiguration conf = new ChukwaConfiguration(); private String solrUrl = null; public HttpProxy() { super(); + ChukwaConfiguration conf = new ChukwaConfiguration(); solrUrl = conf.get(SOLR_URL); } @@ -72,7 +73,7 @@ public class HttpProxy extends HttpServlet { LOG.info("Response Code : " + responseCode); BufferedReader in = new BufferedReader(new InputStreamReader( - con.getInputStream())); + con.getInputStream(), Charset.forName("UTF-8"))); String inputLine; StringBuffer response1 = new StringBuffer(); @@ -80,7 +81,7 @@ public class HttpProxy extends HttpServlet { while ((inputLine = in.readLine()) != null) { response1.append(inputLine); - sout.write(inputLine.getBytes()); + sout.write(inputLine.getBytes(Charset.forName("UTF-8"))); } in.close(); @@ -131,7 +132,7 @@ public class HttpProxy extends HttpServlet { LOG.debug("Response Code : " + responseCode); BufferedReader in = new BufferedReader(new InputStreamReader( - con.getInputStream())); + con.getInputStream(), Charset.forName("UTF-8"))); String inputLine; StringBuffer response1 = new StringBuffer(); @@ -139,7 +140,7 @@ public class HttpProxy extends HttpServlet { while ((inputLine = in.readLine()) != null) { response1.append(inputLine); - sout.write(inputLine.getBytes()); + sout.write(inputLine.getBytes(Charset.forName("UTF-8"))); } in.close(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java index ceed0df..1441253 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java @@ -20,6 +20,8 @@ package org.apache.hadoop.chukwa.hicc.rest; import java.lang.reflect.Type; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.GET; @@ -63,8 +65,8 @@ public class SessionController { Gson gson = new Gson(); Type stringStringMap = new TypeToken<Map<String, String>>(){}.getType(); Map<String,String> map = gson.fromJson(buffer, stringStringMap); - for(String key : map.keySet()) { - request.getSession().setAttribute(key, map.get(key)); + for(Entry<String, String> entry : (Set<Map.Entry<String, String>>) map.entrySet()) { + request.getSession().setAttribute(entry.getKey(), entry.getValue()); } return Response.ok().build(); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java index ea07797..4524922 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java @@ -39,7 +39,7 @@ public class VelocityResolver implements InjectableProvider<Context, Type> { private VelocityEngine ve; private static Logger LOG = Logger.getLogger(VelocityResolver.class); - public static String LOGGER_NAME = VelocityResolver.class.getName(); + public final static String LOGGER_NAME = VelocityResolver.class.getName(); /** * Jersey configuration for setting up Velocity configuration. http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java deleted file mode 100644 index f0a3303..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java +++ /dev/null @@ -1,473 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.chukwa.inputtools.mdl; - - -import java.sql.SQLException; -import java.util.Calendar; -import java.util.Set; -import java.util.TreeSet; -import java.util.TreeMap; -import java.util.Iterator; -import java.sql.Timestamp; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Timer; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Date; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -public class TorqueInfoProcessor { - - private static Log log = LogFactory.getLog(TorqueInfoProcessor.class); - - private int intervalValue = 60; - private String torqueServer = null; - private String torqueBinDir = null; - private String domain = null; - - private TreeMap<String, TreeMap<String, String>> currentHodJobs; - - public TorqueInfoProcessor(DataConfig mdlConfig, int interval) { - this.intervalValue = interval; - - torqueServer = System.getProperty("TORQUE_SERVER"); - torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin"; - domain = System.getProperty("DOMAIN"); - currentHodJobs = new TreeMap<String, TreeMap<String, String>>(); - } - - public void setup(boolean recover) throws Exception { - } - - private void getHodJobInfo() throws IOException { - StringBuffer sb = new StringBuffer(); - sb.append(torqueBinDir).append("/qstat -a"); - - String[] getQueueInfoCommand = new String[3]; - getQueueInfoCommand[0] = "ssh"; - getQueueInfoCommand[1] = torqueServer; - getQueueInfoCommand[2] = sb.toString(); - - String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1] - + " " + getQueueInfoCommand[2]; - ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand); - - Process p = pb.start(); - - Timer timeout = new Timer(); - TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command); - timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000); - - BufferedReader result = new BufferedReader(new InputStreamReader(p - .getInputStream())); - ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(), - command, true); - errorHandler.start(); - - String line = null; - boolean start = false; - TreeSet<String> jobsInTorque = new TreeSet<String>(); - while ((line = result.readLine()) != null) { - if (line.startsWith("---")) { - start = true; - continue; - } - - if (start) { - String[] items = line.split("\\s+"); - if (items.length >= 10) { - String hodIdLong = items[0]; - String hodId = hodIdLong.split("[.]")[0]; - String userId = items[1]; - String numOfMachine = items[5]; - String status = items[9]; - jobsInTorque.add(hodId); - if (!currentHodJobs.containsKey(hodId)) { - TreeMap<String, String> aJobData = new TreeMap<String, String>(); - - aJobData.put("userId", userId); - aJobData.put("numOfMachine", numOfMachine); - aJobData.put("traceCheckCount", "0"); - aJobData.put("process", "0"); - aJobData.put("status", status); - currentHodJobs.put(hodId, aJobData); - } else { - TreeMap<String, String> aJobData = currentHodJobs.get(hodId); - aJobData.put("status", status); - currentHodJobs.put(hodId, aJobData); - }// if..else - } - } - }// while - - try { - errorHandler.join(); - } catch (InterruptedException ie) { - log.error(ie.getMessage()); - } - timeout.cancel(); - - Set<String> currentHodJobIds = currentHodJobs.keySet(); - Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator(); - TreeSet<String> finishedHodIds = new TreeSet<String>(); - while (currentHodJobIdsIt.hasNext()) { - String hodId = currentHodJobIdsIt.next(); - if (!jobsInTorque.contains(hodId)) { - TreeMap<String, String> aJobData = currentHodJobs.get(hodId); - String process = aJobData.get("process"); - if (process.equals("0") || process.equals("1")) { - aJobData.put("status", "C"); - } else { - finishedHodIds.add(hodId); - } - } - }// while - - Iterator<String> finishedHodIdsIt = finishedHodIds.iterator(); - while (finishedHodIdsIt.hasNext()) { - String hodId = finishedHodIdsIt.next(); - currentHodJobs.remove(hodId); - } - - } - - private boolean loadQstatData(String hodId) throws IOException, SQLException { - TreeMap<String, String> aJobData = currentHodJobs.get(hodId); - String userId = aJobData.get("userId"); - - StringBuffer sb = new StringBuffer(); - sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId); - String[] qstatCommand = new String[3]; - qstatCommand[0] = "ssh"; - qstatCommand[1] = torqueServer; - qstatCommand[2] = sb.toString(); - - String command = qstatCommand[0] + " " + qstatCommand[1] + " " - + qstatCommand[2]; - ProcessBuilder pb = new ProcessBuilder(qstatCommand); - Process p = pb.start(); - - Timer timeout = new Timer(); - TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command); - timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000); - - BufferedReader result = new BufferedReader(new InputStreamReader(p - .getInputStream())); - ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(), - command, false); - errorHandler.start(); - String line = null; - String hosts = null; - long startTimeValue = -1; - long endTimeValue = Calendar.getInstance().getTimeInMillis(); - long executeTimeValue = Calendar.getInstance().getTimeInMillis(); - boolean qstatfinished; - - while ((line = result.readLine()) != null) { - if (line.indexOf("ctime") >= 0) { - String startTime = line.split("=")[1].trim(); - // Tue Sep 9 23:44:29 2008 - SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); - Date startTimeDate; - try { - startTimeDate = sdf.parse(startTime); - startTimeValue = startTimeDate.getTime(); - } catch (ParseException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - if (line.indexOf("mtime") >= 0) { - String endTime = line.split("=")[1].trim(); - SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); - Date endTimeDate; - try { - endTimeDate = sdf.parse(endTime); - endTimeValue = endTimeDate.getTime(); - } catch (ParseException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - if (line.indexOf("etime") >= 0) { - String executeTime = line.split("=")[1].trim(); - SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); - Date executeTimeDate; - try { - executeTimeDate = sdf.parse(executeTime); - executeTimeValue = executeTimeDate.getTime(); - } catch (ParseException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - if (line.indexOf("exec_host") >= 0) { - hosts = line.split("=")[1].trim(); - } - } - - if (hosts != null && startTimeValue >= 0) { - String[] items2 = hosts.split("[+]"); - int num = 0; - for (int i = 0; i < items2.length; i++) { - String machinetmp = items2[i]; - if (machinetmp.length() > 3) { - String machine = items2[i].substring(0, items2[i].length() - 2); - StringBuffer data = new StringBuffer(); - data.append("HodId=").append(hodId); - data.append(", Machine=").append(machine); - if (domain != null) { - data.append(".").append(domain); - } - log.info(data); - num++; - } - } - Timestamp startTimedb = new Timestamp(startTimeValue); - Timestamp endTimedb = new Timestamp(endTimeValue); - StringBuffer data = new StringBuffer(); - long timeQueued = executeTimeValue - startTimeValue; - data.append("HodID=").append(hodId); - data.append(", UserId=").append(userId); - data.append(", StartTime=").append(startTimedb); - data.append(", TimeQueued=").append(timeQueued); - data.append(", NumOfMachines=").append(num); - data.append(", EndTime=").append(endTimedb); - log.info(data); - qstatfinished = true; - - } else { - - qstatfinished = false; - } - - try { - errorHandler.join(); - } catch (InterruptedException ie) { - log.error(ie.getMessage()); - } - result.close(); - timeout.cancel(); - - return qstatfinished; - } - - private boolean loadTraceJobData(String hodId) throws IOException, - SQLException { - TreeMap<String, String> aJobData = currentHodJobs.get(hodId); - String userId = aJobData.get("userId"); - - StringBuffer sb = new StringBuffer(); - sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId); - String[] traceJobCommand = new String[3]; - traceJobCommand[0] = "ssh"; - traceJobCommand[1] = torqueServer; - traceJobCommand[2] = sb.toString(); - - String command = traceJobCommand[0] + " " + traceJobCommand[1] + " " - + traceJobCommand[2]; - ProcessBuilder pb = new ProcessBuilder(traceJobCommand); - - Process p = pb.start(); - - Timer timeout = new Timer(); - TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command); - timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000); - - BufferedReader result = new BufferedReader(new InputStreamReader(p - .getInputStream())); - ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(), - command, false); - errorHandler.start(); - String line = null; - String exit_status = null; - String hosts = null; - long timeQueued = -1; - long startTimeValue = -1; - long endTimeValue = -1; - boolean findResult = false; - - while ((line = result.readLine()) != null && !findResult) { - if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0 - && line.indexOf("qtime") >= 0) { - TreeMap<String, String> jobData = new TreeMap<String, String>(); - String[] items = line.split("\\s+"); - for (int i = 0; i < items.length; i++) { - String[] items2 = items[i].split("="); - if (items2.length >= 2) { - jobData.put(items2[0], items2[1]); - } - - } - String startTime = jobData.get("ctime"); - startTimeValue = Long.valueOf(startTime); - startTimeValue = startTimeValue - startTimeValue % (60); - Timestamp startTimedb = new Timestamp(startTimeValue * 1000); - - String queueTime = jobData.get("qtime"); - long queueTimeValue = Long.valueOf(queueTime); - - String sTime = jobData.get("start"); - long sTimeValue = Long.valueOf(sTime); - - timeQueued = sTimeValue - queueTimeValue; - - String endTime = jobData.get("end"); - endTimeValue = Long.valueOf(endTime); - endTimeValue = endTimeValue - endTimeValue % (60); - Timestamp endTimedb = new Timestamp(endTimeValue * 1000); - - exit_status = jobData.get("Exit_status"); - hosts = jobData.get("exec_host"); - String[] items2 = hosts.split("[+]"); - int num = 0; - for (int i = 0; i < items2.length; i++) { - String machinetemp = items2[i]; - if (machinetemp.length() >= 3) { - String machine = items2[i].substring(0, items2[i].length() - 2); - StringBuffer data = new StringBuffer(); - data.append("HodId=").append(hodId); - data.append(", Machine=").append(machine); - if (domain != null) { - data.append(".").append(domain); - } - log.info(data.toString()); - num++; - } - } - - StringBuffer data = new StringBuffer(); - data.append("HodID=").append(hodId); - data.append(", UserId=").append(userId); - data.append(", Status=").append(exit_status); - data.append(", TimeQueued=").append(timeQueued); - data.append(", StartTime=").append(startTimedb); - data.append(", EndTime=").append(endTimedb); - data.append(", NumOfMachines=").append(num); - log.info(data.toString()); - findResult = true; - log.debug(" hod info for job " + hodId + " has been loaded "); - }// if - - }// while - - try { - errorHandler.join(); - } catch (InterruptedException ie) { - log.error(ie.getMessage()); - } - - timeout.cancel(); - boolean tracedone = false; - if (!findResult) { - - String traceCheckCount = aJobData.get("traceCheckCount"); - int traceCheckCountValue = Integer.valueOf(traceCheckCount); - traceCheckCountValue = traceCheckCountValue + 1; - aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue)); - - log.debug("did not find tracejob info for job " + hodId + ", after " - + traceCheckCountValue + " times checking"); - if (traceCheckCountValue >= 2) { - tracedone = true; - } - } - boolean finished = findResult | tracedone; - return finished; - } - - private void process_data() throws SQLException { - - long currentTime = System.currentTimeMillis(); - currentTime = currentTime - currentTime % (60 * 1000); - - Set<String> hodIds = currentHodJobs.keySet(); - - Iterator<String> hodIdsIt = hodIds.iterator(); - while (hodIdsIt.hasNext()) { - String hodId = hodIdsIt.next(); - TreeMap<String, String> aJobData = currentHodJobs.get(hodId); - String status = aJobData.get("status"); - String process = aJobData.get("process"); - if (process.equals("0") && (status.equals("R") || status.equals("E"))) { - try { - boolean result = loadQstatData(hodId); - if (result) { - aJobData.put("process", "1"); - currentHodJobs.put(hodId, aJobData); - } - } catch (IOException ioe) { - log.error("load qsat data Error:" + ioe.getMessage()); - - } - } - if (!process.equals("2") && status.equals("C")) { - try { - boolean result = loadTraceJobData(hodId); - - if (result) { - aJobData.put("process", "2"); - currentHodJobs.put(hodId, aJobData); - } - } catch (IOException ioe) { - log.error("loadTraceJobData Error:" + ioe.getMessage()); - } - }// if - - } // while - - } - - private void handle_jobData() throws SQLException { - try { - getHodJobInfo(); - } catch (IOException ex) { - log.error("getQueueInfo Error:" + ex.getMessage()); - return; - } - try { - process_data(); - } catch (SQLException ex) { - log.error("process_data Error:" + ex.getMessage()); - throw ex; - } - } - - public void run_forever() throws SQLException { - while (true) { - handle_jobData(); - try { - log.debug("sleeping ..."); - Thread.sleep(this.intervalValue * 1000); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - } - } - - public void shutdown() { - } -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java deleted file mode 100644 index 8ea645e..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.chukwa.inputtools.mdl; - - -import java.util.TimerTask; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -public class TorqueTimerTask extends TimerTask { - private Process ps = null; - private String command; - - private static Log log = LogFactory.getLog(TorqueTimerTask.class); - // public static int timeoutInterval=300; - public static int timeoutInterval = 180; - - public TorqueTimerTask() { - super(); - // TODO Auto-generated constructor stub - } - - public TorqueTimerTask(Process process, String command) { - super(); - this.ps = process; - this.command = command; - - } - - public void run() { - ps.destroy(); - log.error("torque command: " + command + " timed out"); - - } - -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java index 7ed3148..ff2a022 100644 --- a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java +++ b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java @@ -20,10 +20,11 @@ package org.apache.hadoop.chukwa.util; import java.io.*; +import java.nio.charset.Charset; import java.util.*; public class ClusterConfig { - public static final HashMap<String, String> clusterMap = new HashMap<String, String>(); + private HashMap<String, String> clusterMap = new HashMap<String, String>(); private String path = System.getenv("CHUKWA_CONF_DIR") + File.separator; static public String getContents(File aFile) { @@ -31,9 +32,7 @@ public class ClusterConfig { StringBuffer contents = new StringBuffer(); try { - // use buffering, reading one line at a time - // FileReader always assumes default encoding is OK! - BufferedReader input = new BufferedReader(new FileReader(aFile)); + BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8"))); try { String line = null; // not declared within while loop /* http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java index c655e24..70a80c0 100644 --- a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java +++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.chukwa.util; +import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Calendar; @@ -29,7 +30,6 @@ import org.mortbay.log.Log; public class HBaseUtil { private static Logger LOG = Logger.getLogger(HBaseUtil.class); - static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC")); static MessageDigest md5 = null; static { try { @@ -50,8 +50,9 @@ public class HBaseUtil { } public static byte[] buildKey(long time, String primaryKey) { + Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC")); c.setTimeInMillis(time); - byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(); + byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8")); byte[] pk = getHash(primaryKey); byte[] key = new byte[12]; System.arraycopy(day, 0, key, 0, day.length); @@ -60,8 +61,9 @@ public class HBaseUtil { } public static byte[] buildKey(long time, String primaryKey, String source) { + Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC")); c.setTimeInMillis(time); - byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(); + byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8")); byte[] pk = getHash(primaryKey); byte[] src = getHash(source); byte[] key = new byte[12]; @@ -73,7 +75,7 @@ public class HBaseUtil { private static byte[] getHash(String key) { byte[] hash = new byte[5]; - System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5); + System.arraycopy(md5.digest(key.getBytes(Charset.forName("UTF-8"))), 0, hash, 0, 5); return hash; } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java index 26e4beb..c1cf37f 100644 --- a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java +++ b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java @@ -43,26 +43,6 @@ public class ChunkImplTest extends TestCase { } } - public void testWrongVersion() { - ChunkBuilder cb = new ChunkBuilder(); - cb.addRecord("foo".getBytes()); - cb.addRecord("bar".getBytes()); - cb.addRecord("baz".getBytes()); - Chunk c = cb.getChunk(); - DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate()); - try { - c.write(ob); - DataInputBuffer ib = new DataInputBuffer(); - ib.reset(ob.getData(), c.getSerializedSizeEstimate()); - // change current chunkImpl version - ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION + 1; - ChunkImpl.read(ib); - fail("Should have raised an IOexception"); - } catch (IOException e) { - // right behavior, do nothing - } - } - public void testTag() { ChunkBuilder cb = new ChunkBuilder(); cb.addRecord("foo".getBytes()); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java index 93500ff..99d8dc1 100644 --- a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java +++ b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java @@ -172,7 +172,8 @@ public class TestFSMBuilder extends TestCase { conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp"); conf.set("chukwaAgent.checkpoint.interval", "10000"); int portno = conf.getInt("chukwaAgent.control.port", agentPort); - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(); + agent.start(); conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa"); conn.start(); sender = new ChukwaHttpSender(conf); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java index a039bc6..5b163c9 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java @@ -59,7 +59,8 @@ public class TestJMXAdaptor extends TestCase{ conf.setInt("chukwaAgent.http.port", 9090); conf.setBoolean("chukwaAgent.checkpoint.enabled", false); - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); } public void testJMXAdaptor() { http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java index 367484e..39f151b 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java @@ -57,7 +57,8 @@ public class TestAddAdaptor extends TestCase { conf.setInt("chukwaAgent.http.port", 9090); conf.setBoolean("chukwaAgent.checkpoint.enabled", false); - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); assertEquals(0, agent.adaptorCount()); System.out.println("adding jmx adaptor"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java index 466053b..948ec5a 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java @@ -63,7 +63,8 @@ public class TestBufferingWrappers extends TestCase { public void resendAfterStop(String adaptor) throws IOException, ChukwaAgent.AlreadyRunningException, InterruptedException { - ChukwaAgent agent = new ChukwaAgent(conf); + ChukwaAgent agent = ChukwaAgent.getAgent(conf); + agent.start(); String ADAPTORID = "adaptor_test" + System.currentTimeMillis(); String STR = "test data"; int PORTNO = 9878; http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java index 717125b..5748c9b 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java @@ -56,7 +56,8 @@ public class TestDirTailingAdaptor extends TestCase { conf.setInt("chukwaAgent.control.port", 0); conf.setBoolean("chukwaAgent.checkpoint.enabled", false); - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); File emptyDir = new File(baseDir, "emptyDir2"); createEmptyDir(emptyDir); @@ -90,7 +91,8 @@ public class TestDirTailingAdaptor extends TestCase { anOldFile.deleteOnExit(); aNewFile.deleteOnExit(); anOldFile.setLastModified(10);//just after epoch - agent = new ChukwaAgent(conf); //restart agent. + agent = ChukwaAgent.getAgent(conf); //restart agent. + agent.start(); Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected. assertTrue(aNewFile.exists()); @@ -135,7 +137,8 @@ public class TestDirTailingAdaptor extends TestCase { while(retry) { try { retry = false; - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); } catch(Exception e) { retry = true; } @@ -167,11 +170,12 @@ public class TestDirTailingAdaptor extends TestCase { anOldFile.deleteOnExit(); aNewFile.deleteOnExit(); anOldFile.setLastModified(10);//just after epoch - agent = new ChukwaAgent(conf); //restart agent. - + agent = ChukwaAgent.getAgent(conf); //restart agent. + agent.start(); + Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected. assertTrue(aNewFile.exists()); - + //make sure we started tailing the new, not the old, file. for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) { System.out.println(adaptors.getKey() +": " + adaptors.getValue()); @@ -182,7 +186,7 @@ public class TestDirTailingAdaptor extends TestCase { Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected. assertEquals(4, agent.adaptorCount()); agent.shutdown(); - + nukeDirContents(checkpointDir);//nuke dir checkpointDir.delete(); emptyDir.delete(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java index 11a084a..1af52d0 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java @@ -35,7 +35,8 @@ public class TestExecAdaptor extends TestCase { Configuration conf = new Configuration(); conf.set("chukwaAgent.control.port", "0"); conf.setBoolean("chukwaAgent.checkpoint.enabled", false); - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); } @Override http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java index eafa12d..fbe249a 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java @@ -55,7 +55,8 @@ public class TestFileAdaptor extends TestCase { public void testOnce() throws IOException, ChukwaAgent.AlreadyRunningException, InterruptedException { - ChukwaAgent agent = new ChukwaAgent(conf); + ChukwaAgent agent = ChukwaAgent.getAgent(conf); + agent.start(); assertEquals(0, agent.adaptorCount()); @@ -75,7 +76,8 @@ public class TestFileAdaptor extends TestCase { ChukwaAgent.AlreadyRunningException, InterruptedException { int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks - ChukwaAgent agent = new ChukwaAgent(conf); + ChukwaAgent agent = ChukwaAgent.getAgent(conf); + agent.start(); for(int i=0; i < tests; ++i) { if(i % 100 == 0) System.out.println("buzzed " + i + " times"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java index bcd940c..4bbf206 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java @@ -24,6 +24,7 @@ import java.net.ServerSocket; import java.net.Socket; import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException; import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector; import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.hadoop.conf.Configuration; @@ -34,8 +35,9 @@ import junit.framework.TestCase; public class TestHeartbeatAdaptor extends TestCase { private volatile boolean shutdown = false; private final int port = 4321; - public void testPingAdaptor() throws IOException, InterruptedException{ + public void testPingAdaptor() throws IOException, InterruptedException, AlreadyRunningException{ ChukwaAgent agent = ChukwaAgent.getAgent(); + agent.start(); Configuration conf = agent.getConfiguration(); conf.set("chukwa.http.writer.host", "localhost"); conf.set("chukwa.http.writer.port", String.valueOf(port)); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java index 1e0f234..65add51 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java @@ -44,7 +44,8 @@ public class TestCharFileTailingAdaptorUTF8 extends TestCase { Configuration conf = new Configuration(); conf.set("chukwaAgent.control.port", "0"); - ChukwaAgent agent = new ChukwaAgent(conf); + ChukwaAgent agent = ChukwaAgent.getAgent(conf); + agent.start(); File testFile = makeTestFile("chukwaTest", 80,baseDir); String adaptorId = agent .processAddCommand("add adaptor_test = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8" http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java index 1c68a1a..fc04f25 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java @@ -36,7 +36,8 @@ public class TestFileExpirationPolicy extends TestCase { try { Configuration conf = new ChukwaConfiguration(); conf.set("chukwaAgent.control.port", "0"); - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000; @@ -79,7 +80,8 @@ public class TestFileExpirationPolicy extends TestCase { Configuration conf = new ChukwaConfiguration(); conf.set("chukwaAgent.control.port", "0"); - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); // Remove any adaptor left over from previous run ChukwaAgentController cli = new ChukwaAgentController("localhost", agent.getControllerPort()); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java index 570e7f4..d622b5d 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java @@ -49,7 +49,8 @@ public class TestFileTailer { ChukwaConfiguration cc = new ChukwaConfiguration(); cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 18); // small in order to have hasMoreData=true // (with 26 letters we should have 2 chunks) - agent = new ChukwaAgent(cc); + agent = ChukwaAgent.getAgent(cc); + agent.start(); ChunkCatcherConnector chunks = new ChunkCatcherConnector(); chunks.start(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java index 40479b5..2a82e79 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java @@ -50,7 +50,8 @@ public class TestFileTailingAdaptorBigRecord extends TestCase { ChukwaConfiguration cc = new ChukwaConfiguration(); cc.set("chukwaAgent.control.port", "0"); cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55); - ChukwaAgent agent = new ChukwaAgent(cc); + ChukwaAgent agent = ChukwaAgent.getAgent(cc); + agent.start(); int portno = agent.getControllerPort(); while (portno == -1) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java index fbbfd94..4590ef3 100644 --- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java @@ -59,7 +59,8 @@ public class TestFileTailingAdaptorPreserveLines { */ @Before public void setUp() throws Exception { - agent = new ChukwaAgent(conf); + agent = ChukwaAgent.getAgent(conf); + agent.start(); chunks = new ChunkCatcherConnector(); chunks.start();
