EAGLE-493 Create alert metadata based on application stream sink configuration

Author: @yonzhang <yonzhang2...@apache.org>

Closes: #389


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c5d05abd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c5d05abd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c5d05abd

Branch: refs/heads/master
Commit: c5d05abd1d5f71d78dbe9a577a3975dc5b964d51
Parents: 7f37267
Author: yonzhang <yonzhang2...@gmail.com>
Authored: Thu Aug 25 14:13:45 2016 -0700
Committer: yonzhang <yonzhang2...@gmail.com>
Committed: Thu Aug 25 14:13:45 2016 -0700

----------------------------------------------------------------------
 .../src/main/resources/application.conf         |  2 +-
 eagle-core/eagle-app/eagle-app-base/pom.xml     |  5 ++
 .../eagle/app/service/ApplicationContext.java   | 49 ++++++++++++++++++--
 .../impl/ApplicationManagementServiceImpl.java  | 14 ++++--
 .../apache/eagle/app/sink/KafkaStreamSink.java  |  2 +-
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |  6 +--
 6 files changed, 63 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index 60595b1..7030e45 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -48,7 +48,7 @@
   "metadataService": {
     "context" : "/rest",
     "host" : "localhost",
-    "port" : 8080
+    "port" : 9090
   },
   "coordinatorService": {
     "host": "localhost",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml 
b/eagle-core/eagle-app/eagle-app-base/pom.xml
index d89b78c..12b056c 100644
--- a/eagle-core/eagle-app/eagle-app-base/pom.xml
+++ b/eagle-core/eagle-app/eagle-app-base/pom.xml
@@ -78,6 +78,11 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-engine</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.sun.jersey</groupId>
             <artifactId>jersey-server</artifactId>
             <version>${jersey.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
index 91d33ca..52eb628 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -19,18 +19,23 @@ package org.apache.eagle.app.service;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.scheme.JsonScheme;
+import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
+import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.ApplicationLifecycle;
 import org.apache.eagle.app.environment.ExecutionRuntime;
 import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
 import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.StreamDesc;
 import org.apache.eagle.metadata.model.StreamSinkConfig;
 
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 /**
@@ -47,12 +52,13 @@ public class ApplicationContext implements Serializable, 
ApplicationLifecycle {
     private final Application application;
     private final ExecutionRuntime runtime;
     private final ApplicationEntity metadata;
+    private final IMetadataDao alertMetadataService;
 
     /**
      * @param metadata ApplicationEntity
      * @param application Application
      */
-    public ApplicationContext(Application application, ApplicationEntity 
metadata, Config envConfig){
+    public ApplicationContext(Application application, ApplicationEntity 
metadata, Config envConfig, IMetadataDao alertMetadataService){
         Preconditions.checkNotNull(application,"Application is null");
         Preconditions.checkNotNull(metadata,"ApplicationEntity is null");
         this.application = application;
@@ -69,6 +75,7 @@ public class ApplicationContext implements Serializable, 
ApplicationLifecycle {
         executionConfig.put("appId", metadata.getAppId());
         executionConfig.put("jarPath", metadata.getJarPath());
         this.config = 
ConfigFactory.parseMap(executionConfig).withFallback(envConfig);
+        this.alertMetadataService = alertMetadataService;
     }
 
     @Override
@@ -83,12 +90,44 @@ public class ApplicationContext implements Serializable, 
ApplicationLifecycle {
                 return streamDesc;
             })).collect(Collectors.toList());
             metadata.setStreams(streamDescCollection);
+
+            // iterate each stream descriptor and create alert datasource for 
each
+            for(StreamDesc streamDesc : streamDescCollection) {
+                // only take care of Kafka sink
+                if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
+                    KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) 
streamDesc.getSink();
+                    Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+                    datasource.setType("KAFKA");
+                    datasource.setName(metadata.getAppId());
+                    datasource.setTopic(kafkaCfg.getTopicId());
+                    
datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
+                    Tuple2StreamMetadata tuple2Stream = new 
Tuple2StreamMetadata();
+                    Set<String> activeStreamNames = new HashSet<>();
+                    
activeStreamNames.add(streamDesc.getSchema().getStreamId());
+                    tuple2Stream.setActiveStreamNames(activeStreamNames);
+                    tuple2Stream.setTimestampColumn("timestamp");
+                    
tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
+                    datasource.setCodec(tuple2Stream);
+                    alertMetadataService.addDataSource(datasource);
+
+                    StreamDefinition sd = streamDesc.getSchema();
+                    sd.setDataSource(metadata.getAppId());
+                    alertMetadataService.createStream(streamDesc.getSchema());
+                }
+            }
         }
     }
 
     @Override
     public void onUninstall() {
-        //
+        // we should remove alert data source and stream definition while we 
do uninstall
+        if(metadata.getStreams() == null)
+            return;
+        // iterate each stream descriptor and create alert datasource for each
+        for(StreamDesc streamDesc : metadata.getStreams()) {
+            alertMetadataService.removeDataSource(metadata.getAppId());
+            alertMetadataService.removeStream(streamDesc.getStreamId());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index 314b0fb..c355a10 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.app.service.ApplicationContext;
 import org.apache.eagle.app.service.ApplicationOperations;
 import org.apache.eagle.app.service.ApplicationManagementService;
@@ -45,6 +46,7 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
     private final SiteEntityService siteEntityService;
     private final ApplicationProviderService applicationProviderService;
     private final ApplicationEntityService applicationEntityService;
+    private final IMetadataDao alertMetadataService;
     private final Config config;
     private final static Logger LOGGER = 
LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
 
@@ -53,11 +55,13 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
             Config config,
             SiteEntityService siteEntityService,
             ApplicationProviderService applicationProviderService,
-            ApplicationEntityService applicationEntityService){
+            ApplicationEntityService applicationEntityService,
+            IMetadataDao alertMetadataService){
         this.config = config;
         this.siteEntityService = siteEntityService;
         this.applicationProviderService = applicationProviderService;
         this.applicationEntityService = applicationEntityService;
+        this.alertMetadataService = alertMetadataService;
     }
 
     @Override
@@ -97,7 +101,7 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
         applicationEntity.setConfiguration(appConfig);
         ApplicationContext applicationContext = new ApplicationContext(
                 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
-                applicationEntity,config);
+                applicationEntity,config, alertMetadataService);
         applicationContext.onInstall();
         return applicationEntityService.create(applicationEntity);
     }
@@ -107,7 +111,7 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
         ApplicationEntity applicationEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
                 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
-                applicationEntity,config);
+                applicationEntity,config, alertMetadataService);
         // TODO: Check status, skip stop if already STOPPED
         try {
             applicationContext.onStop();
@@ -123,7 +127,7 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
         ApplicationEntity applicationEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
                 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
-                applicationEntity,config);
+                applicationEntity,config, alertMetadataService);
         applicationContext.onStart();
         return applicationEntity;
     }
@@ -133,7 +137,7 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
         ApplicationEntity applicationEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
                 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
-                applicationEntity,config);
+                applicationEntity,config, alertMetadataService);
         applicationContext.onStop();
         return applicationEntity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 5c33c94..27848d9 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -57,7 +57,7 @@ public class KafkaStreamSink extends 
StormStreamSink<KafkaStreamSinkConfig> {
     protected void execute(Object key, Map event,BasicOutputCollector 
collector) {
         try {
             String output = new ObjectMapper().writeValueAsString(event);
-            producer.send(new KeyedMessage(this.topicId, event.get("user"), 
output));
+            producer.send(new KeyedMessage(this.topicId, key, output));
         }catch(Exception ex){
             LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 2419747..2a8ff0f 100644
--- 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -117,7 +117,7 @@
         <property>
             <name>dataSinkConfig.topic</name>
             <displayName>dataSinkConfig.topic</displayName>
-            <value>hdfs_audit_log_parsed</value>
+            <value>hdfs_audit_log_enriched</value>
             <description>topic for kafka data sink</description>
         </property>
         <property>
@@ -149,8 +149,8 @@
     </configuration>
     <streams>
         <stream>
-            <streamId>hdfs_audit_log_stream</streamId>
-            <description>Hdfs Audit Log Stream</description>
+            <streamId>hdfs_audit_log_enriched_stream</streamId>
+            <description>Hdfs Audit Log Enriched Stream</description>
             <validate>true</validate>
             <timeseries>true</timeseries>
             <columns>

Reply via email to