EAGLE-493 Create alert metadata based on application stream sink configuration
Author: @yonzhang <yonzhang2...@apache.org> Closes: #389 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c5d05abd Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c5d05abd Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c5d05abd Branch: refs/heads/master Commit: c5d05abd1d5f71d78dbe9a577a3975dc5b964d51 Parents: 7f37267 Author: yonzhang <yonzhang2...@gmail.com> Authored: Thu Aug 25 14:13:45 2016 -0700 Committer: yonzhang <yonzhang2...@gmail.com> Committed: Thu Aug 25 14:13:45 2016 -0700 ---------------------------------------------------------------------- .../src/main/resources/application.conf | 2 +- eagle-core/eagle-app/eagle-app-base/pom.xml | 5 ++ .../eagle/app/service/ApplicationContext.java | 49 ++++++++++++++++++-- .../impl/ApplicationManagementServiceImpl.java | 14 ++++-- .../apache/eagle/app/sink/KafkaStreamSink.java | 2 +- ...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 6 +-- 6 files changed, 63 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf index 60595b1..7030e45 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf @@ -48,7 +48,7 @@ "metadataService": { "context" : "/rest", "host" : "localhost", - "port" : 8080 + "port" : 9090 }, "coordinatorService": { "host": "localhost", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml b/eagle-core/eagle-app/eagle-app-base/pom.xml index d89b78c..12b056c 100644 --- a/eagle-core/eagle-app/eagle-app-base/pom.xml +++ b/eagle-core/eagle-app/eagle-app-base/pom.xml @@ -78,6 +78,11 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>alert-engine</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-server</artifactId> <version>${jersey.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java index 91d33ca..52eb628 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java @@ -19,18 +19,23 @@ package org.apache.eagle.app.service; import com.google.common.base.Preconditions; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.scheme.JsonScheme; +import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector; +import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.app.Application; import org.apache.eagle.app.ApplicationLifecycle; import org.apache.eagle.app.environment.ExecutionRuntime; import org.apache.eagle.app.environment.ExecutionRuntimeManager; +import org.apache.eagle.app.sink.KafkaStreamSinkConfig; import org.apache.eagle.metadata.model.ApplicationEntity; import org.apache.eagle.metadata.model.StreamDesc; import org.apache.eagle.metadata.model.StreamSinkConfig; import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; /** @@ -47,12 +52,13 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle { private final Application application; private final ExecutionRuntime runtime; private final ApplicationEntity metadata; + private final IMetadataDao alertMetadataService; /** * @param metadata ApplicationEntity * @param application Application */ - public ApplicationContext(Application application, ApplicationEntity metadata, Config envConfig){ + public ApplicationContext(Application application, ApplicationEntity metadata, Config envConfig, IMetadataDao alertMetadataService){ Preconditions.checkNotNull(application,"Application is null"); Preconditions.checkNotNull(metadata,"ApplicationEntity is null"); this.application = application; @@ -69,6 +75,7 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle { executionConfig.put("appId", metadata.getAppId()); executionConfig.put("jarPath", metadata.getJarPath()); this.config = ConfigFactory.parseMap(executionConfig).withFallback(envConfig); + this.alertMetadataService = alertMetadataService; } @Override @@ -83,12 +90,44 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle { return streamDesc; })).collect(Collectors.toList()); metadata.setStreams(streamDescCollection); + + // iterate each stream descriptor and create alert datasource for each + for(StreamDesc streamDesc : streamDescCollection) { + // only take care of Kafka sink + if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) { + KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink(); + Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); + datasource.setType("KAFKA"); + datasource.setName(metadata.getAppId()); + datasource.setTopic(kafkaCfg.getTopicId()); + datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); + Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata(); + Set<String> activeStreamNames = new HashSet<>(); + activeStreamNames.add(streamDesc.getSchema().getStreamId()); + tuple2Stream.setActiveStreamNames(activeStreamNames); + tuple2Stream.setTimestampColumn("timestamp"); + tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName()); + datasource.setCodec(tuple2Stream); + alertMetadataService.addDataSource(datasource); + + StreamDefinition sd = streamDesc.getSchema(); + sd.setDataSource(metadata.getAppId()); + alertMetadataService.createStream(streamDesc.getSchema()); + } + } } } @Override public void onUninstall() { - // + // we should remove alert data source and stream definition while we do uninstall + if(metadata.getStreams() == null) + return; + // iterate each stream descriptor and create alert datasource for each + for(StreamDesc streamDesc : metadata.getStreams()) { + alertMetadataService.removeDataSource(metadata.getAppId()); + alertMetadataService.removeStream(streamDesc.getStreamId()); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java index 314b0fb..c355a10 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.app.service.ApplicationContext; import org.apache.eagle.app.service.ApplicationOperations; import org.apache.eagle.app.service.ApplicationManagementService; @@ -45,6 +46,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe private final SiteEntityService siteEntityService; private final ApplicationProviderService applicationProviderService; private final ApplicationEntityService applicationEntityService; + private final IMetadataDao alertMetadataService; private final Config config; private final static Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class); @@ -53,11 +55,13 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe Config config, SiteEntityService siteEntityService, ApplicationProviderService applicationProviderService, - ApplicationEntityService applicationEntityService){ + ApplicationEntityService applicationEntityService, + IMetadataDao alertMetadataService){ this.config = config; this.siteEntityService = siteEntityService; this.applicationProviderService = applicationProviderService; this.applicationEntityService = applicationEntityService; + this.alertMetadataService = alertMetadataService; } @Override @@ -97,7 +101,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe applicationEntity.setConfiguration(appConfig); ApplicationContext applicationContext = new ApplicationContext( applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(), - applicationEntity,config); + applicationEntity,config, alertMetadataService); applicationContext.onInstall(); return applicationEntityService.create(applicationEntity); } @@ -107,7 +111,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId()); ApplicationContext applicationContext = new ApplicationContext( applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(), - applicationEntity,config); + applicationEntity,config, alertMetadataService); // TODO: Check status, skip stop if already STOPPED try { applicationContext.onStop(); @@ -123,7 +127,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId()); ApplicationContext applicationContext = new ApplicationContext( applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(), - applicationEntity,config); + applicationEntity,config, alertMetadataService); applicationContext.onStart(); return applicationEntity; } @@ -133,7 +137,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId()); ApplicationContext applicationContext = new ApplicationContext( applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(), - applicationEntity,config); + applicationEntity,config, alertMetadataService); applicationContext.onStop(); return applicationEntity; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java index 5c33c94..27848d9 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java @@ -57,7 +57,7 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> { protected void execute(Object key, Map event,BasicOutputCollector collector) { try { String output = new ObjectMapper().writeValueAsString(event); - producer.send(new KeyedMessage(this.topicId, event.get("user"), output)); + producer.send(new KeyedMessage(this.topicId, key, output)); }catch(Exception ex){ LOG.error(ex.getMessage(), ex); collector.reportError(ex); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml index 2419747..2a8ff0f 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml @@ -117,7 +117,7 @@ <property> <name>dataSinkConfig.topic</name> <displayName>dataSinkConfig.topic</displayName> - <value>hdfs_audit_log_parsed</value> + <value>hdfs_audit_log_enriched</value> <description>topic for kafka data sink</description> </property> <property> @@ -149,8 +149,8 @@ </configuration> <streams> <stream> - <streamId>hdfs_audit_log_stream</streamId> - <description>Hdfs Audit Log Stream</description> + <streamId>hdfs_audit_log_enriched_stream</streamId> + <description>Hdfs Audit Log Enriched Stream</description> <validate>true</validate> <timeseries>true</timeseries> <columns>