Repository: eagle Updated Branches: refs/heads/master 62f8c78d3 -> 77fbff720
[EAGLE-846] HDFS audit log traffic monitoring https://issues.apache.org/jira/browse/EAGLE-846 Author: Zhao, Qingwen <qingwz...@apache.org> Author: Qingwen Zhao <qingwen...@gmail.com> Closes #756 from qingwen220/EAGLE-846. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/77fbff72 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/77fbff72 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/77fbff72 Branch: refs/heads/master Commit: 77fbff720145e08e5ea6ad30117317ef5b1e031d Parents: 62f8c78 Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Thu Dec 29 12:11:35 2016 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Thu Dec 29 12:11:35 2016 +0800 ---------------------------------------------------------------------- .../app/environment/impl/StormEnvironment.java | 4 + .../app/messaging/EntityStreamPersist.java | 98 ++++++++++++++ .../impl/ApplicationHealthCheckServiceImpl.java | 1 + .../org/apache/eagle/common/DateTimeUtil.java | 22 ++++ .../apache/eagle/common/TestDateTimeUtil.java | 15 +++ .../eagle/security/hdfs/HDFSAuditLogObject.java | 8 ++ .../eagle/security/hdfs/HDFSAuditLogParser.java | 11 +- .../traffic/HadoopLogAccumulatorBolt.java | 127 +++++++++++++++++++ .../traffic/HadoopLogTrafficPersist.java | 81 ++++++++++++ .../security/traffic/SimpleWindowCounter.java | 77 +++++++++++ .../AbstractHdfsAuditLogApplication.java | 13 +- .../auditlog/HdfsAuditLogApplication.java | 4 +- .../auditlog/HdfsAuditLogParserBolt.java | 33 +++-- ...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 26 ++++ .../auditlog/TestUserCommandReassembler.java | 3 - .../auditlog/MapRFSAuditLogApplication.java | 2 +- ...urity.auditlog.MapRFSAuditLogAppProvider.xml | 9 +- .../apache/eagle/server/ServerApplication.java | 10 +- .../eagle/topology/TopologyCheckAppConfig.java | 4 +- ....eagle.topology.TopologyCheckAppProvider.xml | 18 ++- .../src/main/resources/application.conf | 8 +- .../src/test/resources/application.conf | 16 ++- 22 files changed, 554 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java index 59c8277..942a0ac 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java @@ -48,6 +48,10 @@ public class StormEnvironment extends AbstractEnvironment { return new MetricStreamPersist(metricDefinition, config); } + public EntityStreamPersist getEntityPersist(Config config) { + return new EntityStreamPersist(config); + } + public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) { return new MetricSchemaGenerator(metricDefinition, config); } http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java new file mode 100644 index 0000000..e216dc6 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.messaging; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; + +public class EntityStreamPersist extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(EntityStreamPersist.class); + + private final Config config; + private IEagleServiceClient client; + private OutputCollector collector; + private int batchSize; + private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>(); + + public EntityStreamPersist(Config config) { + this.config = config; + this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.client = new EagleServiceClientImpl(config); + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + List<? extends TaggedLogAPIEntity> entities = (List<? extends TaggedLogAPIEntity>) input.getValue(0); + entityBucket.addAll(entities); + + if (entityBucket.size() < batchSize) { + return; + } + + try { + GenericServiceAPIResponseEntity response = client.create(entityBucket); + if (response.isSuccess()) { + LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp()); + collector.ack(input); + } else { + LOG.error("Service side error: {}", response.getException()); + collector.reportError(new IllegalStateException(response.getException())); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + collector.fail(input); + } + entityBucket.clear(); + } + + @Override + public void cleanup() { + try { + this.client.getJerseyClient().destroy(); + this.client.close(); + } catch (IOException e) { + LOG.error("Close client error: {}", e.getMessage(), e); + } finally { + super.cleanup(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java index 1607b0f..8403d55 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java @@ -49,6 +49,7 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer private int initialDelay = 10; private int period = 300; + public static final String HEALTH_CHECK_PATH = "application.healthCheck"; private static final String HEALTH_INITIAL_DELAY_PATH = "application.healthCheck.initialDelay"; private static final String HEALTH_PERIOD_PATH = "application.healthCheck.period"; private static final String HEALTH_PUBLISHER_PATH = "application.healthCheck.publisher"; http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java index a69feda..98a5a4b 100644 --- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java +++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java @@ -61,6 +61,14 @@ public class DateTimeUtil { return sdf.format(t); } + public static String secondsToHumanDate(long seconds, TimeZone timeZone) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + sdf.setTimeZone(timeZone); + Date t = new Date(); + t.setTime(seconds * 1000); + return sdf.format(t); + } + public static String millisecondsToHumanDateWithMilliseconds(long milliseconds) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); sdf.setTimeZone(CURRENT_TIME_ZONE); @@ -84,6 +92,13 @@ public class DateTimeUtil { return d.getTime() / 1000; } + public static long humanDateToSeconds(String date, TimeZone timeZone) throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + sdf.setTimeZone(timeZone); + Date d = sdf.parse(date); + return d.getTime() / 1000; + } + public static long humanDateToMilliseconds(String date) throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); sdf.setTimeZone(CURRENT_TIME_ZONE); @@ -91,6 +106,13 @@ public class DateTimeUtil { return d.getTime(); } + public static long humanDateToMilliseconds(String date, TimeZone timeZone) throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + sdf.setTimeZone(timeZone); + Date d = sdf.parse(date); + return d.getTime(); + } + public static long humanDateToMillisecondsWithoutException(String date) { try { http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java index a008723..42dad4a 100755 --- a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java +++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java @@ -19,8 +19,10 @@ package org.apache.eagle.common; import org.junit.Assert; import org.junit.Test; +import java.text.ParseException; import java.util.Calendar; import java.util.GregorianCalendar; +import java.util.TimeZone; public class TestDateTimeUtil { @Test @@ -84,4 +86,17 @@ public class TestDateTimeUtil { //cal.setTimeInMillis(System.currentTimeMillis()); System.out.println(cal.get(Calendar.DAY_OF_WEEK)); } + + @Test + public void testTimeZone() throws ParseException { + for (String s : TimeZone.getAvailableIDs()) { + System.out.println(s); + } + String date = "2016-12-23 07:35:49"; + TimeZone timeZone = TimeZone.getTimeZone("GMT+8"); + long timestamp = DateTimeUtil.humanDateToSeconds(date, timeZone); + String dateUTC = "2016-12-22 23:35:49"; + timeZone = TimeZone.getTimeZone("UTC"); + Assert.assertTrue(DateTimeUtil.secondsToHumanDate(timestamp, timeZone).equals(dateUTC)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java index a294ce1..1cc1a2a 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java @@ -27,4 +27,12 @@ public class HDFSAuditLogObject { public String cmd; public String src; public String dst; + + public static final String HDFS_TIMESTAMP_KEY = "timestamp"; + public static final String HDFS_HOST_KEY = "host"; + public static final String HDFS_ALLOWED_KEY = "allowed"; + public static final String HDFS_USER_KEY = "user"; + public static final String HDFS_CMD_KEY = "cmd"; + public static final String HDFS_SRC_KEY = "src"; + public static final String HDFS_DST_KEY = "dst"; } http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java index 7257975..b4f4017 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java @@ -22,15 +22,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.TimeZone; /** * e.g. 2015-09-21 21:36:52,172 INFO FSNamesystem.audit: allowed=true ugi=hadoop (auth:KERBEROS) ip=/x.x.x.x cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc */ public final class HDFSAuditLogParser implements Serializable { - private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class); + private static final Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class); + private TimeZone timeZone; public HDFSAuditLogParser() { + this.timeZone = DateTimeUtil.CURRENT_TIME_ZONE; + } + + public HDFSAuditLogParser(TimeZone timeZone) { + this.timeZone = timeZone; } public static String parseUser(String ugi) { @@ -91,7 +98,7 @@ public final class HDFSAuditLogParser implements Serializable { entity.dst = dst; entity.host = ip; entity.allowed = Boolean.valueOf(allowed); - entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data); + entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data, timeZone); return entity; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java new file mode 100644 index 0000000..c5cc0df --- /dev/null +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.security.traffic; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import com.typesafe.config.Config; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.common.utils.Tuple2; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.security.hdfs.HDFSAuditLogObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.eagle.app.utils.ApplicationExecutionConfig.APP_ID_KEY; +import static org.apache.eagle.app.utils.ApplicationExecutionConfig.SITE_ID_KEY; + +public class HadoopLogAccumulatorBolt extends BaseRichBolt { + private static Logger LOG = LoggerFactory.getLogger(HadoopLogAccumulatorBolt.class); + + private static final int DEFAULT_WINDOW_SIZE = 10; + private static final String HADOOP_LOG_METRIC_NAME = "hadoop.log.count.minute"; + private static final String HDFS_COUNTER_WINDOW_SIZE = "dataSinkConfig.metricWindowSize"; + + private int taskId; + private String site; + private String appId; + private HadoopLogTrafficPersist client; + private SimpleWindowCounter accumulator; + private OutputCollector collector; + private int windowSize; + + public HadoopLogAccumulatorBolt(Config config) { + if (config.hasPath(SITE_ID_KEY)) { + this.site = config.getString(SITE_ID_KEY); + } + if (config.hasPath(APP_ID_KEY)) { + this.appId = config.getString(APP_ID_KEY); + } + if (config.hasPath(HDFS_COUNTER_WINDOW_SIZE)) { + this.windowSize = config.getInt(HDFS_COUNTER_WINDOW_SIZE); + } else { + this.windowSize = DEFAULT_WINDOW_SIZE; + } + this.accumulator = new SimpleWindowCounter(windowSize); + this.client = new HadoopLogTrafficPersist(config); + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.taskId = context.getThisTaskId(); + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(0); + long timeInMs = (long) toBeCopied.get(HDFSAuditLogObject.HDFS_TIMESTAMP_KEY); + long timeInMin = DateTimeUtil.roundDown(Calendar.MINUTE, timeInMs); + try { + collector.ack(input); + if (!isOrdered(timeInMin)) { + LOG.warn("data is out of order, the estimated throughput may be incorrect"); + return; + } + if (accumulator.isFull()) { + Tuple2<Long, Long> pair = accumulator.poll(); + GenericMetricEntity metric = generateMetric(pair.f0(), pair.f1()); + client.emitMetric(metric); + } else { + accumulator.insert(timeInMin, 1); + } + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + } + } + + private boolean isOrdered(long timestamp) { + if (accumulator.isEmpty() || !accumulator.isFull()) { + return true; + } + return accumulator.peek() <= timestamp + windowSize * DateTimeUtil.ONEMINUTE; + } + + private GenericMetricEntity generateMetric(long timestamp, long count) { + GenericMetricEntity metricEntity = new GenericMetricEntity(); + Map<String, String> tags = new HashMap<>(); + tags.put("appId", appId); + tags.put("site", site); + tags.put("taskId", String.valueOf(taskId)); + metricEntity.setTimestamp(timestamp); + metricEntity.setTags(tags); + metricEntity.setPrefix(HADOOP_LOG_METRIC_NAME); + metricEntity.setValue(new double[] {count}); + return metricEntity; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java new file mode 100644 index 0000000..29f61ca --- /dev/null +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.security.traffic; + +import com.typesafe.config.Config; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class HadoopLogTrafficPersist implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(HadoopLogTrafficPersist.class); + private static final String SINK_BATCH_SIZE = "dataSinkConfig.metricSinkBatchSize"; + private final Config config; + private IEagleServiceClient client; + private int batchSize; + private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>(); + + public HadoopLogTrafficPersist(Config config) { + this.config = config; + this.batchSize = config.hasPath(SINK_BATCH_SIZE) ? config.getInt(SINK_BATCH_SIZE) : 1; + } + + public void emitMetric(GenericMetricEntity metricEntity) { + entityBucket.add(metricEntity); + if (entityBucket.size() < batchSize) { + return; + } + + try { + client = new EagleServiceClientImpl(config); + GenericServiceAPIResponseEntity response = client.create(entityBucket); + if (response.isSuccess()) { + LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp()); + + } else { + LOG.error("Service side error: {}", response.getException()); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { + entityBucket.clear(); + close(); + } + } + + public void close() { + try { + if (client != null) { + this.client.getJerseyClient().destroy(); + this.client.close(); + } + } catch (IOException e) { + LOG.error("Close client error: {}", e.getMessage(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java new file mode 100644 index 0000000..5293577 --- /dev/null +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.security.traffic; + +import org.apache.eagle.common.utils.Tuple2; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class SimpleWindowCounter implements Serializable { + + private int windowSize; + + private Map<Long, Long> counter; + private Queue<Long> timeQueue; + + public SimpleWindowCounter(int size) { + this.windowSize = size; + counter = new ConcurrentHashMap<>(windowSize); + timeQueue = new PriorityQueue<>(); + } + + public boolean insert(long timestamp, long countVal) { + boolean success = true; + if (counter.containsKey(timestamp)) { + counter.put(timestamp, counter.get(timestamp) + countVal); + } else { + if (counter.size() < windowSize) { + counter.put(timestamp, countVal); + timeQueue.add(timestamp); + } else { + success =false; + } + } + return success; + } + + public int getSize() { + return counter.size(); + } + + public boolean isFull() { + return counter.size() >= windowSize; + } + + public boolean isEmpty() { + return counter.isEmpty(); + } + + public synchronized Tuple2<Long, Long> poll() { + long oldestTimestamp = timeQueue.poll(); + Tuple2<Long, Long> pair = new Tuple2<>(oldestTimestamp, counter.get(oldestTimestamp)); + counter.remove(oldestTimestamp); + return pair; + } + + public long peek() { + return timeQueue.peek(); + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java index 6d7022b..bc8ceb1 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -28,9 +28,11 @@ import com.typesafe.config.Config; import org.apache.commons.lang3.time.DateUtils; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.messaging.EntityStreamPersist; import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.common.config.EagleConfigConstants; import org.apache.eagle.dataproc.impl.storm.partition.*; +import org.apache.eagle.security.traffic.HadoopLogAccumulatorBolt; import org.apache.eagle.security.partition.DataDistributionDaoImpl; import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; @@ -44,6 +46,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { public final static String SENSITIVITY_JOIN_TASK_NUM = "topology.numOfSensitivityJoinTasks"; public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks"; public final static String SINK_TASK_NUM = "topology.numOfSinkTasks"; + public final static String TRAFFIC_MONITOR_ENABLED = "dataSinkConfig.trafficMonitorEnabled"; @Override public StormTopology execute(Config config, StormEnvironment environment) { @@ -63,7 +66,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { // ingest -> parserBolt // --------------------- - BaseRichBolt parserBolt = getParserBolt(); + BaseRichBolt parserBolt = getParserBolt(config); BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest"); boltDeclarer.shuffleGrouping("ingest"); @@ -83,6 +86,12 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { // sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt"); + if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) { + HadoopLogAccumulatorBolt auditLogAccumulator = new HadoopLogAccumulatorBolt(config); + BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfParserTasks); + auditLogAccumulatorDeclarer.setNumTasks(numOfParserTasks).shuffleGrouping("parserBolt"); + } + // ------------------------------ // sensitivityJoin -> ipZoneJoin // ------------------------------ @@ -101,7 +110,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { return builder.createTopology(); } - public abstract BaseRichBolt getParserBolt(); + public abstract BaseRichBolt getParserBolt(Config config); public abstract String getSinkStreamName(); http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java index 6d3c58c..5f300f3 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java @@ -30,8 +30,8 @@ import com.typesafe.config.ConfigFactory; */ public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication { @Override - public BaseRichBolt getParserBolt() { - return new HdfsAuditLogParserBolt(); + public BaseRichBolt getParserBolt(Config config) { + return new HdfsAuditLogParserBolt(config); } @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java index 4590e8a..7b9d9a5 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java @@ -25,13 +25,17 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.security.hdfs.HDFSAuditLogObject; import org.apache.eagle.security.hdfs.HDFSAuditLogParser; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Map; +import java.util.TimeZone; import java.util.TreeMap; /** @@ -39,8 +43,19 @@ import java.util.TreeMap; */ public class HdfsAuditLogParserBolt extends BaseRichBolt { private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class); + private static final String DATASOURCE_TIMEZONE_PATH = "dataSourceConfig.timeZone"; + private OutputCollector collector; - private static final HDFSAuditLogParser parser = new HDFSAuditLogParser(); + private HDFSAuditLogParser parser; + + public HdfsAuditLogParserBolt(Config config) { + if (config.hasPath(DATASOURCE_TIMEZONE_PATH)) { + TimeZone timeZone = TimeZone.getTimeZone(config.getString(DATASOURCE_TIMEZONE_PATH)); + parser = new HDFSAuditLogParser(timeZone); + } else { + parser = new HDFSAuditLogParser(); + } + } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { @@ -54,14 +69,14 @@ public class HdfsAuditLogParserBolt extends BaseRichBolt { try { entity = parser.parse(logLine); Map<String, Object> map = new TreeMap<>(); - map.put("src", entity.src); - map.put("dst", entity.dst); - map.put("host", entity.host); - map.put("timestamp", entity.timestamp); - map.put("allowed", entity.allowed); - map.put("user", entity.user); - map.put("cmd", entity.cmd); - collector.emit(Collections.singletonList(map)); + map.put(HDFSAuditLogObject.HDFS_SRC_KEY, entity.src); + map.put(HDFSAuditLogObject.HDFS_DST_KEY, entity.dst); + map.put(HDFSAuditLogObject.HDFS_HOST_KEY, entity.host); + map.put(HDFSAuditLogObject.HDFS_TIMESTAMP_KEY, entity.timestamp); + map.put(HDFSAuditLogObject.HDFS_ALLOWED_KEY, entity.allowed); + map.put(HDFSAuditLogObject.HDFS_USER_KEY, entity.user); + map.put(HDFSAuditLogObject.HDFS_CMD_KEY, entity.cmd); + collector.emit(input, Collections.singletonList(map)); } catch (Exception ex) { LOG.error("Failing parse audit log message {}", logLine, ex); } finally { http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml index 1108497..90f9e5b 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml @@ -90,6 +90,13 @@ <description>scheme class</description> <required>true</required> </property> + <property> + <name>dataSourceConfig.timeZone</name> + <displayName>Log Time Zone</displayName> + <description>time zone of hdfs audit log </description> + <value>GMT</value> + <required>true</required> + </property> <!-- data enrich configurations --> <property> @@ -150,6 +157,25 @@ <value>0</value> <description>value controls when a produce request is considered completed</description> </property> + <property> + <name>dataSinkConfig.trafficMonitorEnabled</name> + <displayName>Log Traffic Monitor Enabled</displayName> + <value>false</value> + <description>enable the log throughput calculation</description> + <required>true</required> + </property> + <property> + <name>dataSinkConfig.metricWindowSize</name> + <displayName>Window Size for Traffic Counting</displayName> + <value>10</value> + <description>window size to calculate the throughput</description> + </property> + <property> + <name>dataSinkConfig.metricSinkBatchSize</name> + <displayName>Batch Size for Flushing Traffic Metrics</displayName> + <value>1</value> + <description>batch size of flushing metrics </description> + </property> <!-- web app related configurations --> <property> http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java index caea37a..f7ddad2 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java @@ -30,9 +30,6 @@ import org.junit.Test; import java.util.*; -/** - * Created by yonzhang on 11/24/15. - */ public class TestUserCommandReassembler { private Map parseEvent(String log) throws Exception{ HDFSAuditLogParser deserializer = new HDFSAuditLogParser(); http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java index 3319164..b72cb13 100644 --- a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java +++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java @@ -27,7 +27,7 @@ import com.typesafe.config.ConfigFactory; */ public class MapRFSAuditLogApplication extends AbstractHdfsAuditLogApplication { @Override - public BaseRichBolt getParserBolt() { + public BaseRichBolt getParserBolt(Config config) { return new MapRFSAuditLogParserBolt(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml index c54dd28..074b828 100644 --- a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml +++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml @@ -21,10 +21,9 @@ --> <application> - <type>MapRFSAuditLogApplication</type> + <type>MAPR_HDFS_AUDIT_LOG_MONITOR_APP</type> <name>MapRFS Audit Log Monitoring Application</name> <version>0.5.0-incubating</version> - <appClass>org.apache.eagle.security.auditlog.MapRFSAuditLogApplication</appClass> <viewPath>/apps/example</viewPath> <configuration> <!-- topology related configurations --> @@ -52,6 +51,12 @@ <value>2</value> <description>number of sink tasks</description> </property> + <property> + <name>topology.numOfMetricSinkTasks</name> + <displayName>Topology Metric Sink Tasks</displayName> + <value>1</value> + <description>number of metric sink tasks</description> + </property> <!-- data source configurations --> <property> http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java index ccf3c28..4118892 100644 --- a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java +++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java @@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory; import javax.servlet.DispatcherType; import java.util.EnumSet; +import static org.apache.eagle.app.service.impl.ApplicationHealthCheckServiceImpl.HEALTH_CHECK_PATH; + class ServerApplication extends Application<ServerConfig> { private static final Logger LOG = LoggerFactory.getLogger(ServerApplication.class); @Inject @@ -121,9 +123,11 @@ class ServerApplication extends Application<ServerConfig> { environment.lifecycle().manage(updateAppStatusTask); // Initialize application extended health checks. - LOG.debug("Registering ApplicationHealthCheckService"); - applicationHealthCheckService.init(environment); - environment.lifecycle().manage(new ManagedService(applicationHealthCheckService)); + if (config.hasPath(HEALTH_CHECK_PATH)) { + LOG.debug("Registering ApplicationHealthCheckService"); + applicationHealthCheckService.init(environment); + environment.lifecycle().manage(new ManagedService(applicationHealthCheckService)); + } // Load application shared extension services. LOG.debug("Registering application shared extension services"); http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java index da6cd46..f6d61f6 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java @@ -98,14 +98,14 @@ public class TopologyCheckAppConfig implements Serializable { hBaseConfig.hbaseMasterPrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.master.principal", null); } - if (config.hasPath("dataSourceConfig.mr")) { + if (config.hasPath("dataSourceConfig.mr") && config.getBoolean("dataSourceConfig.mr.enabled")) { topologyTypes.add(TopologyConstants.TopologyType.MR); mrConfig = new MRConfig(); mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*"); mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null); } - if (config.hasPath("dataSourceConfig.hdfs")) { + if (config.hasPath("dataSourceConfig.hdfs") && config.getBoolean("dataSourceConfig.hdfs.enabled")) { topologyTypes.add(TopologyConstants.TopologyType.HDFS); hdfsConfig = new HdfsConfig(); hdfsConfig.namenodeUrls = config.getString("dataSourceConfig.hdfs.namenodeUrl").split(",\\s*"); http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index 0b83e9b..2089a2f 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -77,6 +77,19 @@ <displayName>Hdfs Namenode Web URL</displayName> <description>hdfs namenode web url for HDFS monitor</description> <value>http://sandbox.hortonworks.com:50070</value> + </property> + <property> + <name>dataSourceConfig.hdfs.enabled</name> + <displayName>HDFS Topology Check Enabled</displayName> + <description>HDFS topology status check enabled</description> + <value>false</value> + <required>true</required> + </property> + <property> + <name>dataSourceConfig.mr.enabled</name> + <displayName>MR Topology Check Enabled</displayName> + <description>MR topology status check enabled</description> + <value>false</value> <required>true</required> </property> <property> @@ -84,7 +97,6 @@ <displayName>Resource Manager URL</displayName> <description>resource manager url for YARN monitor</description> <value>http://sandbox.hortonworks.com:8088</value> - <required>true</required> </property> <property> <name>dataSourceConfig.mr.historyServerUrl</name> @@ -94,8 +106,8 @@ </property> <property> <name>dataSourceConfig.hbase.enabled</name> - <displayName>HBase Config Enabled</displayName> - <description>enabled for HBase monitor</description> + <displayName>HBase Topology Check Enabled</displayName> + <description>HBase topology status check enabled</description> <value>false</value> <required>true</required> </property> http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf index 4d22912..5d5f1d3 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf @@ -33,8 +33,12 @@ } dataSourceConfig : { - hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070", + hdfs: { + enabled: false, + namenodeUrl: "http://sandbox.hortonworks.com:50070", + } mr: { + enabled: false, rmUrl: "http://sandbox.hortonworks.com:50030", historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty } @@ -49,7 +53,7 @@ eagle.principal: "", #if not need, then empty eagle.keytab: "" } - }, + } } "dataSinkConfig": { http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf index be84c1d..da52f65 100644 --- a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf +++ b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf @@ -28,20 +28,26 @@ } dataSourceConfig : { - hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070", + hdfs: { + enabled: false, + namenodeUrl: "http://sandbox.hortonworks.com:50070", + } + mr: { + enabled: false, + rmUrl: "http://sandbox.hortonworks.com:50030", + historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty + } hbase: { + enabled: false, zkQuorum: "sandbox.hortonworks.com", zkPropertyClientPort : "2181", zkZnodeParent: "/hbase-unsecure", + zkRetryTimes : "5", kerberos : { master.principal : "hadoop/_h...@example.com" eagle.principal: "", #if not need, then empty eagle.keytab: "" } - }, - mr: { - rmUrl: "http://sandbox.hortonworks.com:8088", - historyServerUrl : "http://sandbox.hortonworks.com:19888" } }