CHUKWA-817. Prevent connections leaks when HBaseWriter or parser generate errors. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/97164192 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/97164192 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/97164192 Branch: refs/heads/master Commit: 971641927fd399143329bcc0480e0f1e6b032a74 Parents: 5bf5070 Author: Eric Yang <[email protected]> Authored: Sun Mar 5 12:38:17 2017 -0800 Committer: Eric Yang <[email protected]> Committed: Sun Mar 5 12:38:17 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 6 +- .../writer/PipelineStageWriter.java | 9 ++- .../datacollection/writer/WriterException.java | 5 +- .../writer/hbase/HBaseWriter.java | 27 ++++++-- .../hbase/HadoopMetricsProcessor.java | 70 ++++++++++++-------- .../inputtools/log4j/Log4jMetricsSink.java | 2 +- 6 files changed, 79 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1d0eb56..882c9ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,10 @@ Trunk (unreleased changes) BUGS + CHUKWA-818. Convert Solr client to singleton to prevent connection leaks. (Eric Yang) + + CHUKWA-817. Prevent connections leaks when HBaseWriter or parser generate errors. (Eric Yang) + CHUKWA-813. Fix logviewer for sorting log entries. (Eric Yang) CHUKWA-812. Added throttle to dashboard save. (Eric Yang) @@ -29,8 +33,6 @@ Release 0.8 - 05/22/2016 BUGS - CHUKWA-818. Convert Solr client to singleton to prevent connection leaks. (Eric Yang) - CHUKWA-808. Fix solr startup for Docker support. (Eric Yang) CHUKWA-807. Update Docker support to current. (Eric Yang) http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java index 141be20..bd98fb6 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java @@ -92,7 +92,11 @@ public class PipelineStageWriter implements ChukwaWriter { // if authentication type is kerberos; login using the specified kerberos principal and keytab file for(int i=0; i<classes.length; i++) { if(classes[i].contains("HBaseWriter")) { - loginToKerberos (conf); + try { + loginToKerberos (conf); + } catch(IOException e) { + throw new WriterException("Unable to login to Kerberos."); + } } } @@ -111,8 +115,7 @@ public class PipelineStageWriter implements ChukwaWriter { writer = (ChukwaWriter) st; // one stage pipeline } return; - } catch (IOException | - WriterException | + } catch (WriterException | ClassNotFoundException | IllegalAccessException | InstantiationException e) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java index e2a2e45..dc0f0b1 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.chukwa.datacollection.writer; +import java.io.IOException; -public class WriterException extends Exception { +public class WriterException extends IOException { /** * */ - private static final long serialVersionUID = -4207275200546397145L; + private static final long serialVersionUID = -4207275200546397146L; public WriterException() { } http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java index 5ba87bd..4a67226 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java @@ -132,9 +132,18 @@ public class HBaseWriter extends PipelineableWriter { @Override public CommitStatus add(List<Chunk> chunks) throws WriterException { CommitStatus rv = ChukwaWriter.COMMIT_OK; + Table hbase; + Table meta; try { - Table hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE)); - Table meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE)); + if (connection == null || connection.isClosed()) { + try { + connection = ConnectionFactory.createConnection(hconf); + } catch (IOException e) { + throw new WriterException("HBase is offline, retry later..."); + } + } + hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE)); + meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE)); for(Chunk chunk : chunks) { synchronized (this) { try { @@ -143,7 +152,8 @@ public class HBaseWriter extends PipelineableWriter { hbase.put(output); meta.put(reporter.getInfo()); } catch (Throwable e) { - log.warn(output); + log.warn("Unable to process data:"); + log.warn(new String(chunk.getData())); log.warn(ExceptionUtil.getStackTrace(e)); } dataSize += chunk.getData().length; @@ -155,8 +165,15 @@ public class HBaseWriter extends PipelineableWriter { meta.close(); } catch (Exception e) { log.error(ExceptionUtil.getStackTrace(e)); - throw new WriterException("Failed to store data to HBase."); - } + if(connection != null) { + try { + connection.close(); + } catch(IOException e2) { + connection = null; + throw new WriterException("HBase connection maybe leaking."); + } + } + } if (next != null) { rv = next.add(chunks); //pass data through } http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java index 9c719e4..b0aab83 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; +import org.apache.hadoop.chukwa.util.ExceptionUtil; public class HadoopMetricsProcessor extends AbstractProcessor { @@ -44,38 +45,53 @@ public class HadoopMetricsProcessor extends AbstractProcessor { @Override protected void parse(byte[] recordEntry) throws Throwable { - String body = new String(recordEntry, Charset.forName("UTF-8")); - int start = body.indexOf('{'); - JSONObject json = (JSONObject) JSONValue.parse(body.substring(start)); + String body = new String(recordEntry, Charset.forName("UTF-8")); + int start = 0; + int end = 0; + try { + while(true) { + start = body.indexOf('{', end); + end = body.indexOf('}', start)+1; + if (start == -1) + break; - time = ((Long) json.get(timestampField)).longValue(); - String contextName = (String) json.get(contextNameField); - String recordName = (String) json.get(recordNameField); - String src = ((String) json.get(hostName)).toLowerCase(); - if(json.get(processName)!=null) { - src = new StringBuilder(src).append(":").append(json.get(processName)).toString(); - } - for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) { - String keyName = entry.getKey(); - if (timestampField.intern() == keyName.intern()) { - continue; - } else if (contextNameField.intern() == keyName.intern()) { - continue; - } else if (recordNameField.intern() == keyName.intern()) { - continue; - } else if (hostName.intern() == keyName.intern()) { - continue; - } else if (processName.intern() == keyName.intern()) { - continue; - } else { - if(json.get(keyName)!=null) { - String v = entry.getValue().toString(); - String primaryKey = new StringBuilder(contextName).append(".") + JSONObject json = (JSONObject) JSONValue.parse(body.substring(start,end)); + + time = ((Long) json.get(timestampField)).longValue(); + String contextName = (String) json.get(contextNameField); + String recordName = (String) json.get(recordNameField); + String src = ((String) json.get(hostName)).toLowerCase(); + if(json.get(processName)!=null) { + src = new StringBuilder(src).append(":").append(json.get(processName)).toString(); + } + for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) { + String keyName = entry.getKey(); + if (timestampField.intern() == keyName.intern()) { + continue; + } else if (contextNameField.intern() == keyName.intern()) { + continue; + } else if (recordNameField.intern() == keyName.intern()) { + continue; + } else if (hostName.intern() == keyName.intern()) { + continue; + } else if (processName.intern() == keyName.intern()) { + continue; + } else { + if(json.get(keyName)!=null) { + String v = entry.getValue().toString(); + String primaryKey = new StringBuilder(contextName).append(".") .append(recordName).append(".").append(keyName).toString(); - addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output); + addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output); + } } } } + } catch(Exception e) { + LOG.warn("Unparsable data:"); + LOG.warn(body); + LOG.warn(ExceptionUtil.getStackTrace(e)); + // Skip unparsable data. + } } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java index cb97223..3e6189d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java +++ b/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java @@ -38,7 +38,7 @@ public class Log4jMetricsSink implements MetricsSink { protected String context = "HadoopMetrics"; protected String host = "localhost"; protected int port = 9095; - protected Logger out = null; + protected static Logger out = null; @Override public void init(SubsetConfiguration conf) {
