METRON-1794 Include User Details When Escalating Alerts (nickwallen) closes apache/metron#1212
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c0fb2625 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c0fb2625 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c0fb2625 Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: c0fb26258f9a78d375a50dd5d746404130dfe06b Parents: 9c9e295 Author: nickwallen <n...@nickallen.org> Authored: Mon Oct 1 09:49:17 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Mon Oct 1 09:49:17 2018 -0400 ---------------------------------------------------------------------- .../apache/metron/rest/MetronRestConstants.java | 3 + .../rest/service/impl/AlertsUIServiceImpl.java | 53 ++++++++++++---- .../service/impl/AlertsUIServiceImplTest.java | 63 ++++++++++++++++---- .../src/test/resources/log4j.properties | 5 +- 4 files changed, 98 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/c0fb2625/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index e3bf698..94e8e35 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -53,6 +53,9 @@ public class MetronRestConstants { public static final String KAFKA_BROKER_URL_SPRING_PROPERTY = "kafka.broker.url"; public static final String KAFKA_TOPICS_ESCALATION_PROPERTY = "kafka.topics.escalation"; + public static final String METRON_ESCALATION_USER_FIELD = "metron_escalation_user"; + public static final String METRON_ESCALATION_TIMESTAMP_FIELD = "metron_escalation_timestamp"; + public static final String KERBEROS_ENABLED_SPRING_PROPERTY = "kerberos.enabled"; public static final String KERBEROS_PRINCIPLE_SPRING_PROPERTY = "kerberos.principal"; public static final String KERBEROS_KEYTAB_SPRING_PROPERTY = "kerberos.keytab"; http://git-wip-us.apache.org/repos/asf/metron/blob/c0fb2625/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java index 7d0a8f8..bf035e2 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertsUIServiceImpl.java @@ -19,26 +19,32 @@ package org.apache.metron.rest.service.impl; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.rest.MetronRestConstants; +import org.apache.metron.hbase.client.UserSettingsClient; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.AlertsUIUserSettings; -import org.apache.metron.hbase.client.UserSettingsClient; import org.apache.metron.rest.security.SecurityUtils; import org.apache.metron.rest.service.AlertsUIService; import org.apache.metron.rest.service.KafkaService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.metron.rest.MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY; +import static org.apache.metron.rest.MetronRestConstants.METRON_ESCALATION_TIMESTAMP_FIELD; +import static org.apache.metron.rest.MetronRestConstants.METRON_ESCALATION_USER_FIELD; + /** * The default service layer implementation of {@link AlertsUIService}. * @@ -47,6 +53,7 @@ import org.springframework.stereotype.Service; @Service public class AlertsUIServiceImpl implements AlertsUIService { + static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String ALERT_USER_SETTING_TYPE = "metron-alerts-ui"; public static ThreadLocal<ObjectMapper> _mapper = ThreadLocal.withInitial(() -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL)); @@ -54,6 +61,7 @@ public class AlertsUIServiceImpl implements AlertsUIService { private Environment environment; private final KafkaService kafkaService; private UserSettingsClient userSettingsClient; + private Clock clock; @Autowired public AlertsUIServiceImpl(final KafkaService kafkaService, @@ -62,15 +70,25 @@ public class AlertsUIServiceImpl implements AlertsUIService { this.kafkaService = kafkaService; this.environment = environment; this.userSettingsClient = userSettingsClient; + this.clock = new Clock(); } @Override public void escalateAlerts(List<Map<String, Object>> alerts) throws RestException { + String user = SecurityUtils.getCurrentUser(); + String topic = environment.getProperty(KAFKA_TOPICS_ESCALATION_PROPERTY); + Long now = clock.currentTimeMillis(); + LOG.info("Escalating {} alert(s): user={}, topic={}, timestamp={}", alerts.size(), user, topic, now); + try { for (Map<String, Object> alert : alerts) { - kafkaService.produceMessage( - environment.getProperty(MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY), - JSONUtils.INSTANCE.toJSON(alert, false)); + // attribute the escalation to the current user + alert.put(METRON_ESCALATION_USER_FIELD, user); + alert.put(METRON_ESCALATION_TIMESTAMP_FIELD, now); + + // serialize the alert and push it to the escalation topic + String message = JSONUtils.INSTANCE.toJSON(alert, false); + kafkaService.produceMessage(topic, message); } } catch (JsonProcessingException e) { throw new RestException(e); @@ -128,4 +146,15 @@ public class AlertsUIServiceImpl implements AlertsUIService { } return success; } + + /** + * Set the {@link Clock} used by this service. + * + * <p>Calling this method is only needed to override the default behavior. This is useful when testing. + * + * @param clock + */ + public void setClock(Clock clock) { + this.clock = clock; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/c0fb2625/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java index dc52712..545d7f9 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertsUIServiceImplTest.java @@ -41,6 +41,7 @@ import java.util.Optional; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.system.FakeClock; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.model.AlertsUIUserSettings; import org.apache.metron.hbase.client.UserSettingsClient; @@ -79,9 +80,10 @@ public class AlertsUIServiceImplTest { private KafkaService kafkaService; private Environment environment; private UserSettingsClient userSettingsClient; - private AlertsUIService alertsUIService; + private AlertsUIServiceImpl alertsUIService; private String user1 = "user1"; private String user2 = "user2"; + private FakeClock clock; @SuppressWarnings("unchecked") @Before @@ -91,6 +93,11 @@ public class AlertsUIServiceImplTest { userSettingsClient = mock(UserSettingsClient.class); alertsUIService = new AlertsUIServiceImpl(kafkaService, environment, userSettingsClient); + // use a fake clock for testing + clock = new FakeClock(); + clock.elapseSeconds(1000); + alertsUIService.setClock(clock); + // assume user1 is logged in for tests Authentication authentication = Mockito.mock(Authentication.class); UserDetails userDetails = Mockito.mock(UserDetails.class); @@ -100,21 +107,26 @@ public class AlertsUIServiceImplTest { } @Test - public void produceMessageShouldProperlyProduceMessage() throws Exception { - String escalationTopic = "escalation"; - final Map<String, Object> message1 = new HashMap<>(); - message1.put("field", "value1"); - final Map<String, Object> message2 = new HashMap<>(); - message2.put("field", "value2"); - List<Map<String, Object>> messages = Arrays.asList(message1, message2); + public void escalateAlertShouldSendMessageToKafka() throws Exception { + final String field = "field"; + final String value1 = "value1"; + final String value2 = "value2"; + + // define the escalation topic + final String escalationTopic = "escalation"; when(environment.getProperty(MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY)).thenReturn(escalationTopic); - alertsUIService.escalateAlerts(messages); + // create an alert along with the expected escalation message that is sent to kafka + final Map<String, Object> alert1 = mapOf(field, value1); + String escalationMessage1 = escalationMessage(field, value1, user1, clock.currentTimeMillis()); + + final Map<String, Object> alert2 = mapOf(field, value2); + String escalationMessage2 = escalationMessage(field, value2, user1, clock.currentTimeMillis()); - String expectedMessage1 = "{\"field\":\"value1\"}"; - String expectedMessage2 = "{\"field\":\"value2\"}"; - verify(kafkaService).produceMessage("escalation", expectedMessage1); - verify(kafkaService).produceMessage("escalation", expectedMessage2); + // escalate the alerts and validate + alertsUIService.escalateAlerts(Arrays.asList(alert1, alert2)); + verify(kafkaService).produceMessage(escalationTopic, escalationMessage1); + verify(kafkaService).produceMessage(escalationTopic, escalationMessage2); verifyZeroInteractions(kafkaService); } @@ -177,4 +189,29 @@ public class AlertsUIServiceImplTest { verify(userSettingsClient, times(2)).delete(user1, AlertsUIServiceImpl.ALERT_USER_SETTING_TYPE); verifyNoMoreInteractions(userSettingsClient); } + + /** + * Defines what the message sent to Kafka should look-like when an alert is escalated. + * + * @param field The field name. + * @param value The value of the field. + * @param user The user who escalated the alert. + * @param timestamp When the alert was escalated. + * @return The escalated message. + */ + private String escalationMessage(String field, String value, String user, Long timestamp) { + return String.format("{\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":%d}", + field, + value, + MetronRestConstants.METRON_ESCALATION_USER_FIELD, + user, + MetronRestConstants.METRON_ESCALATION_TIMESTAMP_FIELD, + timestamp); + } + + private Map<String, Object> mapOf(String key, Object value) { + Map<String, Object> map = new HashMap<>(); + map.put(key, value); + return map; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/c0fb2625/metron-interface/metron-rest/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/resources/log4j.properties b/metron-interface/metron-rest/src/test/resources/log4j.properties index 492cecf..edf866e 100644 --- a/metron-interface/metron-rest/src/test/resources/log4j.properties +++ b/metron-interface/metron-rest/src/test/resources/log4j.properties @@ -13,4 +13,7 @@ log4j.rootLogger=ERROR, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSS} %-5p [%c] - %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSS} %-5p [%c] - %m%n + +# uncomment the following line to enable debug +#log4j.logger.org.apache.metron.rest=DEBUG