Updated Branches: refs/heads/flume-1.4 42fd6a991 -> 84bdfc717
FLUME-1945: HBase Serializer allow key from regular expression group (Sravya Tirukkovalur via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/84bdfc71 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/84bdfc71 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/84bdfc71 Branch: refs/heads/flume-1.4 Commit: 84bdfc7175ded9dabe03456817844528075eaf0c Parents: 42fd6a9 Author: Brock Noland <[email protected]> Authored: Mon Jun 3 21:44:53 2013 -0700 Committer: Brock Noland <[email protected]> Committed: Mon Jun 3 21:45:04 2013 -0700 ---------------------------------------------------------------------- .../sink/hbase/RegexHbaseEventSerializer.java | 59 +++++++++++---- .../sink/hbase/TestRegexHbaseEventSerializer.java | 36 ++++++++- 2 files changed, 75 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/84bdfc71/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java index 27974d9..7d2b8b7 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java @@ -56,15 +56,21 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { /** Regular expression used to parse groups from event data. */ public static final String REGEX_CONFIG = "regex"; public static final String REGEX_DEFAULT = "(.*)"; - + /** Whether to ignore case when performing regex matches. */ public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase"; public static final boolean INGORE_CASE_DEFAULT = false; - + /** Comma separated list of column names to place matching groups in. */ public static final String COL_NAME_CONFIG = "colNames"; public static final String COLUMN_NAME_DEFAULT = "payload"; + /** Index of the row key in matched regex groups */ + public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex"; + + /** Placeholder in colNames for row key */ + public static final String ROW_KEY_NAME = "ROW_KEY"; + /** Whether to deposit event headers into corresponding column qualifiers */ public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders"; public static final boolean DEPOSIT_HEADERS_DEFAULT = false; @@ -72,12 +78,12 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { /** What charset to use when serializing into HBase's byte arrays */ public static final String CHARSET_CONFIG = "charset"; public static final String CHARSET_DEFAULT = "UTF-8"; - + /* This is a nonce used in HBase row-keys, such that the same row-key * never gets written more than once from within this JVM. */ protected static final AtomicInteger nonce = new AtomicInteger(0); protected static String randomKey = RandomStringUtils.randomAlphanumeric(10); - + protected byte[] cf; private byte[] payload; private List<byte[]> colNames = Lists.newArrayList(); @@ -86,11 +92,12 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { private boolean depositHeaders; private Pattern inputPattern; private Charset charset; - + private int rowKeyIndex; + @Override public void configure(Context context) { String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT); - regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, + regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, INGORE_CASE_DEFAULT); depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG, DEPOSIT_HEADERS_DEFAULT); @@ -98,12 +105,26 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0)); charset = Charset.forName(context.getString(CHARSET_CONFIG, CHARSET_DEFAULT)); - + String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT); String[] columnNames = colNameStr.split(","); - for (String s: columnNames) { + for (String s: columnNames) { colNames.add(s.getBytes(charset)); } + + //Rowkey is optional, default is -1 + rowKeyIndex = context.getInteger(ROW_KEY_INDEX_CONFIG, -1); + //if row key is being used, make sure it is specified correct + if(rowKeyIndex >=0){ + if(rowKeyIndex >= columnNames.length) { + throw new IllegalArgumentException(ROW_KEY_INDEX_CONFIG + " must be " + + "less than num columns " + columnNames.length); + } + if(!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) { + throw new IllegalArgumentException("Column at " + rowKeyIndex + " must be " + + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]); + } + } } @Override @@ -116,7 +137,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { this.payload = event.getBody(); this.cf = columnFamily; } - + /** * Returns a row-key with the following format: * [time in millis]-[random key]-[nonce] @@ -141,11 +162,11 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { randomKey, nonce.getAndIncrement()); return rowKey.getBytes(charset); } - + protected byte[] getRowKey() { return getRowKey(Calendar.getInstance()); } - + @Override public List<Row> getActions() throws FlumeException { List<Row> actions = Lists.newArrayList(); @@ -154,17 +175,23 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { if (!m.matches()) { return Lists.newArrayList(); } - + if (m.groupCount() != colNames.size()) { return Lists.newArrayList(); } - + try { - rowKey = getRowKey(); + if(rowKeyIndex < 0){ + rowKey = getRowKey(); + }else{ + rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8); + } Put put = new Put(rowKey); - + for (int i = 0; i < colNames.size(); i++) { - put.add(cf, colNames.get(i), m.group(i + 1).getBytes(charset)); + if(i != rowKeyIndex) { + put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8)); + } } if (depositHeaders) { for (Map.Entry<String, String> entry : headers.entrySet()) { http://git-wip-us.apache.org/repos/asf/flume/blob/84bdfc71/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java index 191dc54..b102b49 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java @@ -72,15 +72,43 @@ public class TestRegexHbaseEventSerializer { assertEquals("The sky is falling!", resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT)); } - + @Test + public void testRowIndexKey() throws Exception { + RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); + Context context = new Context(); + context.put(RegexHbaseEventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" + + "([^\t]+)$"); + context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG, "col1,col2,ROW_KEY"); + context.put("rowKeyIndex", "2"); + s.configure(context); + + String body = "val1\tval2\trow1"; + Event e = EventBuilder.withBody(Bytes.toBytes(body)); + s.initialize(e, "CF".getBytes()); + List<Row> actions = s.getActions(); + + Put put = (Put)actions.get(0); + + List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf); + assertTrue(kvPairs.size() == 2); + + Map<String, String> resultMap = Maps.newHashMap(); + for (KeyValue kv : kvPairs) { + resultMap.put(new String(kv.getQualifier()), new String(kv.getValue())); + } + assertEquals("val1", resultMap.get("col1")); + assertEquals("val2", resultMap.get("col2")); + assertEquals("row1", Bytes.toString(put.getRow())); + } + @Test /** Test a common case where regex is used to parse apache log format. */ public void testApacheRegex() throws Exception { RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); Context context = new Context(); - context.put(RegexHbaseEventSerializer.REGEX_CONFIG, - "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+)" + - " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\")" + + context.put(RegexHbaseEventSerializer.REGEX_CONFIG, + "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+)" + + " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\")" + " ([^ \"]*|\"[^\"]*\"))?"); context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG, "host,identity,user,time,method,request,protocol,status,size," +
