Repository: incubator-eagle Updated Branches: refs/heads/master 8d662e3a2 -> 435451eff
[EAGLE-665] Refactor kafka stream sink and hdfs audit topology using shuffle grouping * Refactor kafka stream sink * hdfs audit topology using shuffle grouping https://issues.apache.org/jira/browse/EAGLE-665 Author: Hao Chen <h...@apache.org> Closes #551 from haoch/refactorHdfsAuditApp. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/435451ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/435451ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/435451ef Branch: refs/heads/master Commit: 435451eff2d845b1953d99f57948ee79c70cadf3 Parents: 8d662e3 Author: Hao Chen <h...@apache.org> Authored: Sat Oct 22 21:54:27 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Sat Oct 22 21:54:27 2016 +0800 ---------------------------------------------------------------------- .../apache/eagle/app/sink/KafkaStreamSink.java | 9 +- .../eagle/app/sink/LoggingStreamSink.java | 4 +- .../apache/eagle/app/sink/StormStreamSink.java | 14 ++- .../eagle/security/hdfs/HDFSAuditLogParser.java | 113 ++++++++++--------- .../security/hbase/HBaseAuditLogParserBolt.java | 19 ++-- .../HbaseResourceSensitivityDataJoinBolt.java | 12 +- .../AbstractHdfsAuditLogApplication.java | 48 +++++--- .../auditlog/HdfsAuditLogParserBolt.java | 9 +- .../auditlog/HdfsSensitivityDataEnrichBolt.java | 6 +- .../security/auditlog/IPZoneDataEnrichBolt.java | 52 +++++---- .../auditlog/kafka/MessageJsonScheme.java | 2 +- .../auditlog/TestHdfsAuditLogApplication.java | 86 ++++++++++++++ .../auditlog/MapRFSAuditLogParserBolt.java | 4 +- 13 files changed, 246 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java index e2a4b70..cf5351b 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.app.sink; +import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import com.fasterxml.jackson.databind.ObjectMapper; @@ -43,8 +44,8 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> { } @Override - public void prepare(Map stormConf, TopologyContext context) { - super.prepare(stormConf, context); + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + super.prepare(stormConf, context, collector); Properties properties = new Properties(); properties.put("metadata.broker.list", config.getBrokerList()); properties.put("serializer.class", config.getSerializerClass()); @@ -59,13 +60,13 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> { } @Override - protected void execute(Object key, Map event, BasicOutputCollector collector) { + protected void execute(Object key, Map event, OutputCollector collector) throws Exception { try { String output = new ObjectMapper().writeValueAsString(event); producer.send(new KeyedMessage(this.topicId, key, output)); } catch (Exception ex) { LOG.error(ex.getMessage(), ex); - collector.reportError(ex); + throw ex; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java index 8256aba..3a02caf 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java @@ -16,7 +16,7 @@ */ package org.apache.eagle.app.sink; -import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.task.OutputCollector; import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +37,7 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig> } @Override - protected void execute(Object key, Map event, BasicOutputCollector collector) { + protected void execute(Object key, Map event, OutputCollector collector) throws Exception { LOGGER.info("Receiving {}", event); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java index 7ea234a..ad40772 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java @@ -16,6 +16,8 @@ */ package org.apache.eagle.app.sink; +import backtype.storm.task.OutputCollector; +import backtype.storm.topology.base.BaseRichBolt; import org.apache.eagle.metadata.model.StreamSinkConfig; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; @@ -28,9 +30,10 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBasicBolt implements StreamSink<K> { +public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt implements StreamSink<K> { private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class); private String streamId; + private OutputCollector collector; @Override public void init(String streamId, K config) { @@ -38,15 +41,15 @@ public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBa } @Override - public void prepare(Map stormConf, TopologyContext context) { - super.prepare(stormConf, context); + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; } /** * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map]. */ @Override - public void execute(Tuple input, BasicOutputCollector collector) { + public void execute(Tuple input) { try { Map event = null; Object key = input.getValue(0); @@ -63,13 +66,14 @@ public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBa } } execute(key, event, collector); + collector.ack(input); } catch (Exception ex) { LOG.error(ex.getMessage(), ex); collector.reportError(ex); } } - protected abstract void execute(Object key, Map event, BasicOutputCollector collector); + protected abstract void execute(Object key, Map event, OutputCollector collector) throws Exception; private Map tupleAsMap(Tuple tuple) { Map values = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java index 734cc8c..7257975 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java @@ -20,67 +20,78 @@ import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.security.util.LogParseUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.Serializable; /** * e.g. 2015-09-21 21:36:52,172 INFO FSNamesystem.audit: allowed=true ugi=hadoop (auth:KERBEROS) ip=/x.x.x.x cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc */ -public final class HDFSAuditLogParser implements Serializable{ - private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class); +public final class HDFSAuditLogParser implements Serializable { + private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class); - public HDFSAuditLogParser(){ - } + public HDFSAuditLogParser() { + } - public static String parseUser(String ugi) { - /** e.g. - * .1)u...@apd.xyz.com - * .2)hadoop/123.dc1.xyz....@xyz.com (auth:KERBEROS) - * .3)hadoop (auth:KERBEROS) - */ - int index = ugi.indexOf("/"); - if (index != -1) return ugi.substring(0, index).trim(); - index = ugi.indexOf("@"); - if (index != -1) return ugi.substring(0, index).trim(); - index = ugi.indexOf("("); - return ugi.substring(0, index).trim(); - } + public static String parseUser(String ugi) { + /** e.g. + * .1)u...@apd.xyz.com + * .2)hadoop/123.dc1.xyz....@xyz.com (auth:KERBEROS) + * .3)hadoop (auth:KERBEROS) + */ + int index = ugi.indexOf("/"); + if (index != -1) { + return ugi.substring(0, index).trim(); + } + index = ugi.indexOf("@"); + if (index != -1) { + return ugi.substring(0, index).trim(); + } + index = ugi.indexOf("("); + return ugi.substring(0, index).trim(); + } - public HDFSAuditLogObject parse(String log) throws Exception{ - int index0 = log.indexOf(" "); - index0 = log.indexOf(" ",index0+1); - String data = log.substring(0, index0).trim(); - int index1 = log.indexOf("allowed="); int len1 = 8; - int index2 = log.indexOf("ugi="); int len2 = 4; - int index3 = log.indexOf("ip=/"); int len3 = 4; - int index4 = log.indexOf("cmd="); int len4 = 4; - int index5 = log.indexOf("src="); int len5= 4; - int index6 = log.indexOf("dst="); int len6 = 4; - int index7 = log.indexOf("perm="); + public HDFSAuditLogObject parse(String log) throws Exception { + int index0 = log.indexOf(" "); + index0 = log.indexOf(" ", index0 + 1); + String data = log.substring(0, index0).trim(); + int index1 = log.indexOf("allowed="); + int len1 = 8; + int index2 = log.indexOf("ugi="); + int len2 = 4; + int index3 = log.indexOf("ip=/"); + int len3 = 4; + int index4 = log.indexOf("cmd="); + int len4 = 4; + int index5 = log.indexOf("src="); + int len5 = 4; + int index6 = log.indexOf("dst="); + int len6 = 4; + int index7 = log.indexOf("perm="); - String allowed = log.substring(index1 + len1, index2).trim(); - String ugi = log.substring(index2 + len2, index3).trim(); - String ip = log.substring(index3 + len3, index4).trim(); - String cmd = log.substring(index4 + len4, index5).trim(); - String src = log.substring(index5 + len5, index6).trim(); - String dst = log.substring(index6 + len6, index7).trim(); + String allowed = log.substring(index1 + len1, index2).trim(); + String ugi = log.substring(index2 + len2, index3).trim(); + String ip = log.substring(index3 + len3, index4).trim(); + String cmd = log.substring(index4 + len4, index5).trim(); + String src = log.substring(index5 + len5, index6).trim(); + String dst = log.substring(index6 + len6, index7).trim(); - HDFSAuditLogObject entity = new HDFSAuditLogObject(); - String user = LogParseUtil.parseUserFromUGI(ugi); - if (src != null && src.equals("null")) { - src = null; - } + HDFSAuditLogObject entity = new HDFSAuditLogObject(); + String user = LogParseUtil.parseUserFromUGI(ugi); + if (src != null && src.equals("null")) { + src = null; + } - if (dst != null && dst.equals("null")) { - dst = null; - } - entity.user = user; - entity.cmd = cmd; - entity.src = src; - entity.dst = dst; - entity.host = ip; - entity.allowed = Boolean.valueOf(allowed); - entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data); - return entity; - } + if (dst != null && dst.equals("null")) { + dst = null; + } + entity.user = user; + entity.cmd = cmd; + entity.src = src; + entity.dst = dst; + entity.host = ip; + entity.allowed = Boolean.valueOf(allowed); + entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data); + return entity; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java index 79eadd1..ffed0ef 100644 --- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java +++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java @@ -26,9 +26,7 @@ import backtype.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; /** * Since 6/7/16. @@ -36,6 +34,7 @@ import java.util.TreeMap; public class HBaseAuditLogParserBolt extends BaseRichBolt { private static Logger LOG = LoggerFactory.getLogger(HBaseAuditLogParserBolt.class); private OutputCollector collector; + private static final HbaseAuditLogParser parser = new HbaseAuditLogParser(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { @@ -44,12 +43,10 @@ public class HBaseAuditLogParserBolt extends BaseRichBolt { @Override public void execute(Tuple input) { - String logLine = new String(input.getString(0)); - - HbaseAuditLogParser parser = new HbaseAuditLogParser(); - try{ + String logLine = input.getString(0); + try { HbaseAuditLogObject entity = parser.parse(logLine); - Map<String, Object> map = new TreeMap<String, Object>(); + Map<String, Object> map = new TreeMap<>(); map.put("action", entity.action); map.put("host", entity.host); map.put("status", entity.status); @@ -57,10 +54,10 @@ public class HBaseAuditLogParserBolt extends BaseRichBolt { map.put("scope", entity.scope); map.put("user", entity.user); map.put("timestamp", entity.timestamp); - collector.emit(Arrays.asList(map)); - }catch(Exception ex){ + collector.emit(Collections.singletonList(map)); + } catch (Exception ex) { LOG.error("Failing parse and ignore audit log {} ", logLine, ex); - }finally { + } finally { collector.ack(input); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java index c1005cd..a1545c2 100644 --- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java +++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java @@ -30,10 +30,10 @@ import java.util.Map; import java.util.TreeMap; import java.util.regex.Pattern; -public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt<HBaseSensitivityEntity, String> { +public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt<HBaseSensitivityEntity, String> { private final static Logger LOG = LoggerFactory.getLogger(HbaseResourceSensitivityDataJoinBolt.class); - public HbaseResourceSensitivityDataJoinBolt(Config config){ + public HbaseResourceSensitivityDataJoinBolt(Config config) { super(config, new HBaseSensitivityDataEnrichLCM(config)); } @@ -57,8 +57,8 @@ public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt } } Map<String, Object> newEvent = new TreeMap<String, Object>(event); - newEvent.put("sensitivityType", sensitivityEntity == null ? - "NA" : sensitivityEntity.getSensitivityType()); + newEvent.put("sensitivityType", sensitivityEntity == null + ? "NA" : sensitivityEntity.getSensitivityType()); newEvent.put("scope", resource); if (LOG.isDebugEnabled()) { LOG.debug("After hbase resource sensitivity lookup: " + newEvent); @@ -66,9 +66,9 @@ public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt LOG.info("After hbase resource sensitivity lookup: " + newEvent); // push to Kafka sink collector.emit(Arrays.asList(newEvent.get("user"), newEvent)); - }catch(Exception ex){ + } catch (Exception ex) { LOG.error("error joining data, ignore it", ex); - }finally { + } finally { collector.ack(input); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java index b9f480b..a1daf89 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -61,34 +61,50 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); + // --------------------- + // ingest -> parserBolt + // --------------------- BaseRichBolt parserBolt = getParserBolt(); - BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks); + BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest"); + boltDeclarer.shuffleGrouping("ingest"); - Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") || config.getBoolean("eagleProps.useDefaultPartition"); - if(useDefaultPartition){ - boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY)); - }else{ - boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config))); - } + // Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") || config.getBoolean("eagleProps.useDefaultPartition"); + // if (useDefaultPartition) { + // boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY)); + // } else { + // boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config))); + // } + + // ------------------------------ + // parserBolt -> sensitivityJoin + // ------------------------------ HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new HdfsSensitivityDataEnrichBolt(config); BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks); - sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); + // sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); + sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt"); + // ------------------------------ + // sensitivityJoin -> ipZoneJoin + // ------------------------------ IPZoneDataEnrichBolt ipZoneDataJoinBolt = new IPZoneDataEnrichBolt(config); BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks); - ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user")); + // ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user")); + ipZoneDataJoinBoltDeclarer.shuffleGrouping("sensitivityJoin"); + + // ------------------------ + // ipZoneJoin -> kafkaSink + // ------------------------ - StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream",config); + StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream", config); BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks); - kafkaBoltDeclarer.fieldsGrouping("ipZoneJoin", new Fields("user")); + kafkaBoltDeclarer.shuffleGrouping("ipZoneJoin"); return builder.createTopology(); - - } public abstract BaseRichBolt getParserBolt(); + public abstract String getSinkStreamName(); public static PartitionStrategy createStrategy(Config config) { @@ -103,10 +119,8 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin"; Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60; String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin"; - Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60; + Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60; PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE); return strategy; } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java index 1134cb5..4590e8a 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java @@ -30,7 +30,7 @@ import org.apache.eagle.security.hdfs.HDFSAuditLogParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.TreeMap; @@ -40,6 +40,7 @@ import java.util.TreeMap; public class HdfsAuditLogParserBolt extends BaseRichBolt { private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class); private OutputCollector collector; + private static final HDFSAuditLogParser parser = new HDFSAuditLogParser(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { @@ -49,12 +50,10 @@ public class HdfsAuditLogParserBolt extends BaseRichBolt { @Override public void execute(Tuple input) { String logLine = input.getString(0); - - HDFSAuditLogParser parser = new HDFSAuditLogParser(); HDFSAuditLogObject entity = null; try { entity = parser.parse(logLine); - Map<String, Object> map = new TreeMap<String, Object>(); + Map<String, Object> map = new TreeMap<>(); map.put("src", entity.src); map.put("dst", entity.dst); map.put("host", entity.host); @@ -62,7 +61,7 @@ public class HdfsAuditLogParserBolt extends BaseRichBolt { map.put("allowed", entity.allowed); map.put("user", entity.user); map.put("cmd", entity.cmd); - collector.emit(Arrays.asList(map)); + collector.emit(Collections.singletonList(map)); } catch (Exception ex) { LOG.error("Failing parse audit log message {}", logLine, ex); } finally { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java index 2031108..f8e7c4d 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java @@ -35,7 +35,7 @@ import java.util.regex.Pattern; public class HdfsSensitivityDataEnrichBolt extends AbstractDataEnrichBolt<HdfsSensitivityEntity, String> { private static Logger LOG = LoggerFactory.getLogger(HdfsSensitivityDataEnrichBolt.class); - public HdfsSensitivityDataEnrichBolt(Config config){ + public HdfsSensitivityDataEnrichBolt(Config config) { super(config, new HdfsSensitivityDataEnrichLCM(config)); } @@ -68,9 +68,9 @@ public class HdfsSensitivityDataEnrichBolt extends AbstractDataEnrichBolt<HdfsSe } // LOG.info(">>>> After file sensitivity lookup: " + event); collector.emit(Arrays.asList(event.get("user"), event)); - }catch(Exception ex){ + } catch (Exception ex) { LOG.error("error joining data, ignore it", ex); - }finally { + } finally { collector.ack(input); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java index ed0e17b..faab22a 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java @@ -30,33 +30,35 @@ import java.util.Map; import java.util.TreeMap; public class IPZoneDataEnrichBolt extends AbstractDataEnrichBolt<IPZoneEntity, String> { - private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataEnrichBolt.class); + private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataEnrichBolt.class); - public IPZoneDataEnrichBolt(Config config){ - super(config, new IPZoneDataEnrichLCM(config)); - } + public IPZoneDataEnrichBolt(Config config) { + super(config, new IPZoneDataEnrichLCM(config)); + } - @Override - public void executeWithEnrich(Tuple input, Map<String, IPZoneEntity> map) { - try { - Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(1); - Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy - IPZoneEntity e = null; - if (map != null) { - e = map.get(event.get("host")); - } - event.put("securityZone", e == null ? "NA" : e.getSecurityZone()); - if (LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event); - collector.emit(Arrays.asList(event.get("user"), event)); - }catch(Exception ex){ - LOG.error("error joining data, ignore it", ex); - }finally { - collector.ack(input); - } + @Override + public void executeWithEnrich(Tuple input, Map<String, IPZoneEntity> map) { + try { + Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(1); + Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy + IPZoneEntity e = null; + if (map != null) { + e = map.get(event.get("host")); + } + event.put("securityZone", e == null ? "NA" : e.getSecurityZone()); + if (LOG.isDebugEnabled()) { + LOG.debug("After IP zone lookup: " + event); + } + collector.emit(Arrays.asList(event.get("user"), event)); + } catch (Exception ex) { + LOG.error("error joining data, ignore it", ex); + } finally { + collector.ack(input); + } } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("user", "message")); - } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("user", "message")); + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java index 9ffcaf9..1d48208 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java @@ -21,7 +21,7 @@ package org.apache.eagle.security.auditlog.kafka; import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.codehaus.jackson.map.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import storm.kafka.StringScheme; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java new file mode 100644 index 0000000..e09f55d --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.security.auditlog; + + +import com.google.inject.Inject; +import org.apache.eagle.app.resource.ApplicationResource; +import org.apache.eagle.app.service.ApplicationOperations; +import org.apache.eagle.app.test.ApplicationTestBase; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.model.SiteEntity; +import org.apache.eagle.metadata.resource.SiteResource; +import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +@Ignore +public class TestHdfsAuditLogApplication extends ApplicationTestBase { + + @Inject + private SiteResource siteResource; + @Inject + private ApplicationResource applicationResource; + @Inject + ApplicationStatusUpdateService statusUpdateService; + + @Test + public void testHdfsAuditLogApplication() { + // Create local site + SiteEntity siteEntity = new SiteEntity(); + siteEntity.setSiteId("test_site"); + siteEntity.setSiteName("Test Site"); + siteEntity.setDescription("Test Site for HdfsAuditLogApplication"); + siteResource.createSite(siteEntity); + Assert.assertNotNull(siteEntity.getUuid()); + + ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site", "HdfsAuditLogApplication", ApplicationEntity.Mode.LOCAL); + installOperation.setConfiguration(getConf()); + // Install application + ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData(); + // Start application + applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid())); + statusUpdateService.updateApplicationEntityStatus(applicationEntity); + // Stop application + applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid())); + statusUpdateService.updateApplicationEntityStatus(applicationEntity); + // Uninstall application + applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); + try { + applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid()); + Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ") should have been uninstalled"); + } catch (Exception ex) { + // Expected exception + } + } + + + private Map<String, Object> getConf() { + Map<String, Object> conf = new HashMap<>(); + conf.put("dataSinkConfig.topic", "hdfs_audit_log_test"); + conf.put("dataSinkConfig.brokerList", "localhost:6667"); + conf.put("dataSinkConfig.serializerClass", "serializerClass"); + conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass"); + conf.put("spoutNum", 2); + conf.put("mode", "LOCAL"); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java index 37e55c6..e2f0520 100644 --- a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java +++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java @@ -52,7 +52,7 @@ public class MapRFSAuditLogParserBolt extends BaseRichBolt { MAPRFSAuditLogParser parser = new MAPRFSAuditLogParser(); MAPRFSAuditLogObject entity = null; - try{ + try { entity = parser.parse(logLine); Map<String, Object> map = new TreeMap<String, Object>(); map.put("src", entity.src); @@ -64,7 +64,7 @@ public class MapRFSAuditLogParserBolt extends BaseRichBolt { map.put("cmd", entity.cmd); map.put("volume", entity.volume); collector.emit(Arrays.asList(map)); - }catch(Exception ex) { + } catch (Exception ex) { LOG.error("Failing parse audit log message", ex); } }