Repository: incubator-eagle Updated Branches: refs/heads/master 30b127b8d -> dacb86f31
[EAGLE-789] Add a new publisher to write alerts into a local file https://issues.apache.org/jira/browse/EAGLE-789 Author: Zhao, Qingwen <qingwz...@apache.org> Author: Qingwen Zhao <qingwen...@gmail.com> Closes #669 from qingwen220/EAGLE-789. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/dacb86f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/dacb86f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/dacb86f3 Branch: refs/heads/master Commit: dacb86f3178bff034dbc7efea17f555bf453d390 Parents: 30b127b Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Tue Nov 22 19:15:59 2016 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Tue Nov 22 19:15:59 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/model/AlertPublishEvent.java | 11 ++ .../engine/publisher/PublishConstants.java | 5 + .../publisher/impl/AlertEagleStorePlugin.java | 8 +- .../publisher/impl/AlertFilePublisher.java | 103 +++++++++++++++++++ .../publisher/AlertFilePublisherTest.java | 62 +++++++++++ .../metadata/impl/InMemMetadataDaoImpl.java | 6 +- .../metadata/impl/JdbcMetadataDaoImpl.java | 6 +- .../metadata/impl/MongoMetadataDaoImpl.java | 6 +- .../app/dev/public/js/services/policySrv.js | 5 + .../hdfs/HdfsTopologyEntityParser.java | 1 + .../extractor/mr/MRTopologyEntityParser.java | 1 + 11 files changed, 198 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java index 6230731..462e0fb 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java @@ -19,6 +19,7 @@ package org.apache.eagle.alert.engine.model; import com.google.common.base.Preconditions; +import org.apache.eagle.common.DateTimeUtil; import java.util.List; import java.util.Map; @@ -107,4 +108,14 @@ public class AlertPublishEvent { alertEvent.setAlertData(event.getDataMap()); return alertEvent; } + + public String toString() { + return String.format("%s %s alertId=%s, siteId=%s, policyId=%s, alertData=%s", + DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp), + DateTimeUtil.CURRENT_TIME_ZONE.getID(), + alertId, + siteId, + policyId, + alertData.toString()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java index f2168bc..46cce29 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java @@ -35,6 +35,11 @@ public class PublishConstants { public static final String BROKER_LIST = "kafka_broker"; public static final String WRITE_MODE = "kafka_write_mode"; + // local rotated file constants + public static final String FILE_NAME = "fileName"; + public static final String ROTATE_EVERY_KB = "rotate_every_kb"; + public static final String NUMBER_OF_FILES = "number_of_files"; + // slack specific constants public static final String TOKEN = "token"; public static final String CHANNELS = "channels"; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java index f517078..f31d850 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java @@ -54,12 +54,6 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin { } } - @SuppressWarnings("rawtypes") - @Override - public void update(String dedupIntervalMin, Map<String, Object> pluginProperties) { - deduplicator.setDedupIntervalMin(dedupIntervalMin); - } - @Override public void onAlert(AlertStreamEvent event) throws Exception { List<AlertStreamEvent> eventList = this.dedup(event); @@ -75,7 +69,7 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin { @Override protected Logger getLogger() { - return null; + return LOG; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java new file mode 100644 index 0000000..1848979 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.publisher.impl; + +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.PublishConstants; +import org.apache.eagle.common.DateTimeUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; + +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.logging.*; + +public class AlertFilePublisher extends AbstractPublishPlugin { + + private Logger filelogger = Logger.getLogger(AlertFilePublisher.class.getName()); + private FileHandler handler; + private static ObjectMapper objectMapper = new ObjectMapper(); + + private static final String DEFAULT_FILE_NAME = "eagle-alert.log"; + private static final int DEFAULT_ROTATE_SIZE_KB = 1024; + private static final int DEFAULT_FILE_NUMBER = 5; + + @Override + public void init(Config config, Publishment publishment, Map conf) throws Exception { + super.init(config, publishment, conf); + + String fileName = DEFAULT_FILE_NAME; + int rotateSize = DEFAULT_ROTATE_SIZE_KB; + int numOfFiles = DEFAULT_FILE_NUMBER; + if (publishment.getProperties() != null) { + if (publishment.getProperties().containsKey(PublishConstants.FILE_NAME)) { + fileName = (String) publishment.getProperties().get(PublishConstants.FILE_NAME); + } + if (publishment.getProperties().containsKey(PublishConstants.ROTATE_EVERY_KB)) { + rotateSize = Integer.valueOf(publishment.getProperties().get(PublishConstants.ROTATE_EVERY_KB).toString()); + } + if (publishment.getProperties().containsKey(PublishConstants.NUMBER_OF_FILES)) { + numOfFiles = Integer.valueOf(publishment.getProperties().get(PublishConstants.NUMBER_OF_FILES).toString()); + } + } + handler = new FileHandler(fileName, rotateSize * 1024, numOfFiles, true); + handler.setFormatter(new AlertFileFormatter()); + filelogger.addHandler(handler); + filelogger.setUseParentHandlers(false); + } + + class AlertFileFormatter extends Formatter { + + @Override + public String format(LogRecord record) { + return String.format("%s %s\n", DateTimeUtil.millisecondsToHumanDateWithSeconds(record.getMillis()), + record.getMessage()); + } + } + + @Override + public void onAlert(AlertStreamEvent event) throws Exception { + List<AlertStreamEvent> eventList = this.dedup(event); + if (eventList == null || eventList.isEmpty()) { + return; + } + for (AlertStreamEvent e : eventList) { + //filelogger.info(e.toString()); + AlertPublishEvent alert = AlertPublishEvent.createAlertPublishEvent(e); + filelogger.info(objectMapper.writeValueAsString(alert)); + } + } + + @Override + public void close() { + if (handler != null) { + handler.close(); + } + } + + @Override + protected org.slf4j.Logger getLogger() { + return LoggerFactory.getLogger(AlertFilePublisher.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java new file mode 100644 index 0000000..33cb103 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.publisher; + +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.impl.AlertFilePublisher; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class AlertFilePublisherTest { + private static final String TEST_POLICY_ID = "testPolicy"; + + @Test + public void testAlertFilePublisher() throws Exception { + Map<String, Object> properties = new HashMap<>(); + properties.put(PublishConstants.ROTATE_EVERY_KB, 1); + properties.put(PublishConstants.NUMBER_OF_FILES, 1); + + String property = "java.io.tmpdir"; + String tempDir = System.getProperty(property); + System.out.println("OS current temporary directory is " + tempDir); + + //properties.put(PublishConstants.FILE_NAME, tempDir+"eagle-alert.log"); + + Publishment publishment = new Publishment(); + publishment.setName("testFilePublishment"); + publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName()); + publishment.setPolicyIds(Arrays.asList(TEST_POLICY_ID)); + publishment.setDedupIntervalMin("PT0M"); + publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName()); + publishment.setProperties(properties); + + AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID); + + AlertFilePublisher publisher = new AlertFilePublisher(); + publisher.init(null, publishment, null); + + publisher.onAlert(event); + publisher.close(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java index 96cb20e..b608516 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java @@ -219,10 +219,10 @@ public class InMemMetadataDaoImpl implements IMetadataDao { @Override public List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size) { - if (size < 0 || size > alerts.size()) { - size = alerts.size(); - } List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList()); + if (size < 0 || size > result.size()) { + size = result.size(); + } return result.subList(result.size() - size, result.size()); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index 5e0a0e7..22435fe 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -91,10 +91,10 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { @Override public List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size) { List<AlertPublishEvent> alerts = handler.list(AlertPublishEvent.class); - if (size < 0 || size > alerts.size()) { - size = alerts.size(); - } List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList()); + if (size < 0 || size > result.size()) { + size = result.size(); + } return result.subList(result.size() - size, result.size()); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java index af2e231..af0494e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java @@ -376,10 +376,10 @@ public class MongoMetadataDaoImpl implements IMetadataDao { @Override public List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size) { List<AlertPublishEvent> events = list(alerts, AlertPublishEvent.class); - if (size < 0 || size > events.size()) { - size = events.size(); - } List<AlertPublishEvent> result = events.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList()); + if (size < 0 || size > result.size()) { + size = result.size(); + } return events.subList(result.size() - size, result.size()); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-server/src/main/webapp/app/dev/public/js/services/policySrv.js ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/public/js/services/policySrv.js b/eagle-server/src/main/webapp/app/dev/public/js/services/policySrv.js index c2f3d8c..e3268f0 100644 --- a/eagle-server/src/main/webapp/app/dev/public/js/services/policySrv.js +++ b/eagle-server/src/main/webapp/app/dev/public/js/services/policySrv.js @@ -43,6 +43,11 @@ name: "Storage", displayFields: [], fields: [] + }, + 'org.apache.eagle.alert.engine.publisher.impl.AlertFilePublisher': { + name: "LocalFile", + displayFields: ["fileName"], + fields: ["fileName", "rotate_every_kb", "number_of_files"] } }, http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java index c35ed18..627584a 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java @@ -252,6 +252,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) { HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity(); entity.setTimestamp(updateTime); + entity.setLastUpdateTime(updateTime); Map<String, String> tags = new HashMap<String, String>(); entity.setTags(tags); tags.put(SITE_TAG, site); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dacb86f3/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java index 455b1d0..671d07b 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java @@ -203,6 +203,7 @@ public class MRTopologyEntityParser implements TopologyEntityParser { private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) { MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity(); + entity.setTimestamp(updateTime); entity.setLastUpdateTime(updateTime); Map<String, String> tags = new HashMap<String, String>(); entity.setTags(tags);