Repository: incubator-eagle
Updated Branches:
  refs/heads/master 880ba738c -> 7639ff223


[EAGLE-841] CorrelationSpout reads zk connection from datasource if exists

Author: wujinhu <wujinhu...@126.com>

Closes #741 from wujinhu/EAGLE_841.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7639ff22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7639ff22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7639ff22

Branch: refs/heads/master
Commit: 7639ff2237352884c76e862fd14826cb053bb0fc
Parents: 880ba73
Author: wujinhu <wujinhu...@126.com>
Authored: Wed Dec 14 17:26:05 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Wed Dec 14 17:26:05 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/utils/AlertConstants.java       |  3 ++
 .../alert/engine/spout/CorrelationSpout.java    | 38 +++++++++++---------
 .../engine/topology/CorrelationSpoutTest.java   |  8 +++--
 .../eagle/app/service/ApplicationAction.java    | 10 ++++++
 .../src/main/bin/metadata-ddl.sql               | 10 +++---
 5 files changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
index ee2c28c..2740836 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
@@ -28,4 +28,7 @@ public class AlertConstants {
     public static final String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
 
     public static final String COORDINATOR = "coordinator";
+
+    public static final String KAFKA_BROKER_ZK_BASE_PATH = 
"spout.kafkaBrokerZkBasePath";
+    public static final String KAFKA_BROKER_ZK_QUORUM = 
"spout.kafkaBrokerZkQuorum";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 63e94ca..60a9b98 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -27,6 +27,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
@@ -235,8 +236,10 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
 
         // build lookup table for scheme
         Map<String, String> newSchemaName = new HashMap<String, String>();
+        Map<String, Map<String, String>> dataSourceProperties = new 
HashMap<>();
         for (Kafka2TupleMetadata ds : 
newMeta.getKafka2TupleMetadataMap().values()) {
             newSchemaName.put(ds.getTopic(), ds.getSchemeCls());
+            dataSourceProperties.put(ds.getTopic(), ds.getProperties());
         }
 
         // copy and swap
@@ -248,7 +251,8 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
                 LOG.warn(MessageFormat.format("try to create new topic {0}, 
but found in the active spout list, this may indicate some inconsistency", 
topic));
                 continue;
             }
-            KafkaSpoutWrapper newWrapper = createKafkaSpout(conf, context, 
collector, topic, newSchemaName.get(topic), newMeta, sds);
+            KafkaSpoutWrapper newWrapper = 
createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config),
+                    conf, context, collector, topic, newSchemaName.get(topic), 
newMeta, sds);
             newKafkaSpoutList.put(topic, newWrapper);
         }
         // iterate remove topics and then close KafkaSpout
@@ -297,47 +301,47 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
      * @return
      */
     @SuppressWarnings("rawtypes")
-    protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext 
context, SpoutOutputCollector collector, final String topic,
+    protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, 
TopologyContext context, SpoutOutputCollector collector, final String topic,
                                                  String schemeClsName, 
SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception {
-        String kafkaBrokerZkQuorum = 
config.getString("spout.kafkaBrokerZkQuorum");
+        String kafkaBrokerZkQuorum = 
configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM);
         BrokerHosts hosts = null;
-        if (config.hasPath("spout.kafkaBrokerZkBasePath")) {
-            hosts = new ZkHosts(kafkaBrokerZkQuorum, 
config.getString("spout.kafkaBrokerZkBasePath"));
+        if (configure.hasPath("spout.kafkaBrokerZkBasePath")) {
+            hosts = new ZkHosts(kafkaBrokerZkQuorum, 
configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH));
         } else {
             hosts = new ZkHosts(kafkaBrokerZkQuorum);
         }
         String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
-        if (config.hasPath("spout.stormKafkaTransactionZkPath")) {
-            transactionZkRoot = 
config.getString("spout.stormKafkaTransactionZkPath");
+        if (configure.hasPath("spout.stormKafkaTransactionZkPath")) {
+            transactionZkRoot = 
configure.getString("spout.stormKafkaTransactionZkPath");
         }
         boolean logEventEnabled = false;
-        if (config.hasPath("topology.logEventEnabled")) {
-            logEventEnabled = config.getBoolean("topology.logEventEnabled");
+        if (configure.hasPath("topology.logEventEnabled")) {
+            logEventEnabled = configure.getBoolean("topology.logEventEnabled");
         }
         // write partition offset etc. into zkRoot+id, see 
PartitionManager.committedPath
         String zkStateTransactionRelPath = 
DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
-        if (config.hasPath("spout.stormKafkaEagleConsumer")) {
-            zkStateTransactionRelPath = 
config.getString("spout.stormKafkaEagleConsumer");
+        if (configure.hasPath("spout.stormKafkaEagleConsumer")) {
+            zkStateTransactionRelPath = 
configure.getString("spout.stormKafkaEagleConsumer");
         }
         SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, 
transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
         // transaction zkServers
-        boolean stormKafkaUseSameZkQuorumWithKafkaBroker = 
config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
+        boolean stormKafkaUseSameZkQuorumWithKafkaBroker = 
configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
         if (stormKafkaUseSameZkQuorumWithKafkaBroker) {
             ZkServerPortUtils utils = new 
ZkServerPortUtils(kafkaBrokerZkQuorum);
             spoutConfig.zkServers = utils.getZkHosts();
             spoutConfig.zkPort = utils.getZkPort();
         } else {
-            ZkServerPortUtils utils = new 
ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum"));
+            ZkServerPortUtils utils = new 
ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum"));
             spoutConfig.zkServers = utils.getZkHosts();
             spoutConfig.zkPort = utils.getZkPort();
         }
         // transaction update interval
-        spoutConfig.stateUpdateIntervalMs = 
config.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? 
config.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000;
+        spoutConfig.stateUpdateIntervalMs = 
configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? 
configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000;
         // Kafka fetch size
-        spoutConfig.fetchSizeBytes = 
config.hasPath("spout.stormKafkaFetchSizeBytes") ? 
config.getInt("spout.stormKafkaFetchSizeBytes") : 1048586;
+        spoutConfig.fetchSizeBytes = 
configure.hasPath("spout.stormKafkaFetchSizeBytes") ? 
configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586;
         // "startOffsetTime" is for test usage, prod should not use this
-        if (config.hasPath("spout.stormKafkaStartOffsetTime")) {
-            spoutConfig.startOffsetTime = 
config.getInt("spout.stormKafkaStartOffsetTime");
+        if (configure.hasPath("spout.stormKafkaStartOffsetTime")) {
+            spoutConfig.startOffsetTime = 
configure.getInt("spout.stormKafkaStartOffsetTime");
         }
 
         spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
index 9deb4b2..5a86cd2 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
@@ -56,7 +56,7 @@ public class CorrelationSpoutTest {
         AtomicBoolean validated = new AtomicBoolean(false);
         CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) 
{
             @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Map conf, 
TopologyContext context,
+            protected KafkaSpoutWrapper createKafkaSpout(Config config, Map 
conf, TopologyContext context,
                                                          SpoutOutputCollector 
collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, 
Map<String, StreamDefinition> sds)
                 throws Exception {
                 validated.set(true);
@@ -94,7 +94,8 @@ public class CorrelationSpoutTest {
         final AtomicBoolean verified = new AtomicBoolean(false);
         CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) 
{
             @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Map conf,
+            protected KafkaSpoutWrapper createKafkaSpout(Config config,
+                                                         Map conf,
                                                          TopologyContext 
context,
                                                          SpoutOutputCollector 
collector,
                                                          String topic,
@@ -147,7 +148,8 @@ public class CorrelationSpoutTest {
         final AtomicBoolean verified = new AtomicBoolean(false);
         CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) 
{
             @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Map conf, 
TopologyContext context, SpoutOutputCollector collector, final String topic,
+            protected KafkaSpoutWrapper createKafkaSpout(Config config,
+                                                         Map conf, 
TopologyContext context, SpoutOutputCollector collector, final String topic,
                                                          String schemeClsName, 
SpoutSpec streamMetadatas,
                                                          Map<String, 
StreamDefinition> sds) {
                 return new KafkaSpoutWrapper(null, null);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index 8c7c8d6..a502f81 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -31,6 +31,7 @@ import org.apache.eagle.app.Application;
 import org.apache.eagle.app.environment.ExecutionRuntime;
 import org.apache.eagle.app.environment.ExecutionRuntimeManager;
 import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
+import org.apache.eagle.app.messaging.KafkaStreamSourceConfig;
 import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.StreamSourceConfig;
 import org.apache.eagle.metadata.utils.StreamIdConversions;
@@ -144,6 +145,15 @@ public class ApplicationAction implements Serializable {
                 datasource.setName(metadata.getAppId());
                 datasource.setTopic(kafkaCfg.getTopicId());
                 datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
+                datasource.setProperties(new HashMap<>());
+
+                KafkaStreamSourceConfig streamSourceConfig = 
(KafkaStreamSourceConfig) streamDesc.getSourceConfig();
+                if (streamSourceConfig != null) {
+                    Map<String, String> properties = 
datasource.getProperties();
+                    properties.put(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH, 
streamSourceConfig.getBrokerZkPath());
+                    properties.put(AlertConstants.KAFKA_BROKER_ZK_QUORUM, 
streamSourceConfig.getBrokerZkQuorum());
+                }
+
                 Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata();
                 Properties prop = new Properties();
                 
prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, 
streamDesc.getStreamId());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-server-assembly/src/main/bin/metadata-ddl.sql
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/bin/metadata-ddl.sql 
b/eagle-server-assembly/src/main/bin/metadata-ddl.sql
index f80a7ea..4bed927 100644
--- a/eagle-server-assembly/src/main/bin/metadata-ddl.sql
+++ b/eagle-server-assembly/src/main/bin/metadata-ddl.sql
@@ -46,21 +46,21 @@ CREATE TABLE IF NOT EXISTS sites (
 -- eagle security module metadata
 
 CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity (
-  site varchar(20) DEFAULT NULL,
-  filedir varchar(100) DEFAULT NULL,
+  site varchar(20) NOT NULL,
+  filedir varchar(100) NOT NULL,
   sensitivity_type varchar(20) DEFAULT NULL,
   primary key (site, filedir)
 );
 
 CREATE TABLE IF NOT EXISTS ip_securityzone (
-  iphost varchar(100) DEFAULT NULL,
+  iphost varchar(100) NOT NULL,
   security_zone varchar(100) DEFAULT NULL,
   primary key (iphost)
 );
 
 CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity (
-  site varchar(20) DEFAULT NULL,
-  hbase_resource varchar(100) DEFAULT NULL,
+  site varchar(20) NOT NULL,
+  hbase_resource varchar(100) NOT NULL,
   sensitivity_type varchar(20) DEFAULT NULL,
   primary key (site, hbase_resource)
 );

Reply via email to