Repository: incubator-eagle Updated Branches: refs/heads/master 880ba738c -> 7639ff223
[EAGLE-841] CorrelationSpout reads zk connection from datasource if exists Author: wujinhu <wujinhu...@126.com> Closes #741 from wujinhu/EAGLE_841. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7639ff22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7639ff22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7639ff22 Branch: refs/heads/master Commit: 7639ff2237352884c76e862fd14826cb053bb0fc Parents: 880ba73 Author: wujinhu <wujinhu...@126.com> Authored: Wed Dec 14 17:26:05 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Wed Dec 14 17:26:05 2016 +0800 ---------------------------------------------------------------------- .../eagle/alert/utils/AlertConstants.java | 3 ++ .../alert/engine/spout/CorrelationSpout.java | 38 +++++++++++--------- .../engine/topology/CorrelationSpoutTest.java | 8 +++-- .../eagle/app/service/ApplicationAction.java | 10 ++++++ .../src/main/bin/metadata-ddl.sql | 10 +++--- 5 files changed, 44 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java index ee2c28c..2740836 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java @@ -28,4 +28,7 @@ public class AlertConstants { public static final String ALERT_SERVICE_ENDPOINT_NAME = "AlertService"; public static final String COORDINATOR = "coordinator"; + + public static final String KAFKA_BROKER_ZK_BASE_PATH = "spout.kafkaBrokerZkBasePath"; + public static final String KAFKA_BROKER_ZK_QUORUM = "spout.kafkaBrokerZkQuorum"; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java index 63e94ca..60a9b98 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java @@ -27,6 +27,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.commons.collections.CollectionUtils; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.SpoutSpec; @@ -235,8 +236,10 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener // build lookup table for scheme Map<String, String> newSchemaName = new HashMap<String, String>(); + Map<String, Map<String, String>> dataSourceProperties = new HashMap<>(); for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) { newSchemaName.put(ds.getTopic(), ds.getSchemeCls()); + dataSourceProperties.put(ds.getTopic(), ds.getProperties()); } // copy and swap @@ -248,7 +251,8 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic)); continue; } - KafkaSpoutWrapper newWrapper = createKafkaSpout(conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); + KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config), + conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); newKafkaSpoutList.put(topic, newWrapper); } // iterate remove topics and then close KafkaSpout @@ -297,47 +301,47 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener * @return */ @SuppressWarnings("rawtypes") - protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, + protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception { - String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum"); + String kafkaBrokerZkQuorum = configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM); BrokerHosts hosts = null; - if (config.hasPath("spout.kafkaBrokerZkBasePath")) { - hosts = new ZkHosts(kafkaBrokerZkQuorum, config.getString("spout.kafkaBrokerZkBasePath")); + if (configure.hasPath("spout.kafkaBrokerZkBasePath")) { + hosts = new ZkHosts(kafkaBrokerZkQuorum, configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH)); } else { hosts = new ZkHosts(kafkaBrokerZkQuorum); } String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT; - if (config.hasPath("spout.stormKafkaTransactionZkPath")) { - transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath"); + if (configure.hasPath("spout.stormKafkaTransactionZkPath")) { + transactionZkRoot = configure.getString("spout.stormKafkaTransactionZkPath"); } boolean logEventEnabled = false; - if (config.hasPath("topology.logEventEnabled")) { - logEventEnabled = config.getBoolean("topology.logEventEnabled"); + if (configure.hasPath("topology.logEventEnabled")) { + logEventEnabled = configure.getBoolean("topology.logEventEnabled"); } // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH; - if (config.hasPath("spout.stormKafkaEagleConsumer")) { - zkStateTransactionRelPath = config.getString("spout.stormKafkaEagleConsumer"); + if (configure.hasPath("spout.stormKafkaEagleConsumer")) { + zkStateTransactionRelPath = configure.getString("spout.stormKafkaEagleConsumer"); } SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId); // transaction zkServers - boolean stormKafkaUseSameZkQuorumWithKafkaBroker = config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker"); + boolean stormKafkaUseSameZkQuorumWithKafkaBroker = configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker"); if (stormKafkaUseSameZkQuorumWithKafkaBroker) { ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum); spoutConfig.zkServers = utils.getZkHosts(); spoutConfig.zkPort = utils.getZkPort(); } else { - ZkServerPortUtils utils = new ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum")); + ZkServerPortUtils utils = new ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum")); spoutConfig.zkServers = utils.getZkHosts(); spoutConfig.zkPort = utils.getZkPort(); } // transaction update interval - spoutConfig.stateUpdateIntervalMs = config.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? config.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000; + spoutConfig.stateUpdateIntervalMs = configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000; // Kafka fetch size - spoutConfig.fetchSizeBytes = config.hasPath("spout.stormKafkaFetchSizeBytes") ? config.getInt("spout.stormKafkaFetchSizeBytes") : 1048586; + spoutConfig.fetchSizeBytes = configure.hasPath("spout.stormKafkaFetchSizeBytes") ? configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586; // "startOffsetTime" is for test usage, prod should not use this - if (config.hasPath("spout.stormKafkaStartOffsetTime")) { - spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime"); + if (configure.hasPath("spout.stormKafkaStartOffsetTime")) { + spoutConfig.startOffsetTime = configure.getInt("spout.stormKafkaStartOffsetTime"); } spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java index 9deb4b2..5a86cd2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java @@ -56,7 +56,7 @@ public class CorrelationSpoutTest { AtomicBoolean validated = new AtomicBoolean(false); CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { @Override - protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, + protected KafkaSpoutWrapper createKafkaSpout(Config config, Map conf, TopologyContext context, SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds) throws Exception { validated.set(true); @@ -94,7 +94,8 @@ public class CorrelationSpoutTest { final AtomicBoolean verified = new AtomicBoolean(false); CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { @Override - protected KafkaSpoutWrapper createKafkaSpout(Map conf, + protected KafkaSpoutWrapper createKafkaSpout(Config config, + Map conf, TopologyContext context, SpoutOutputCollector collector, String topic, @@ -147,7 +148,8 @@ public class CorrelationSpoutTest { final AtomicBoolean verified = new AtomicBoolean(false); CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { @Override - protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, + protected KafkaSpoutWrapper createKafkaSpout(Config config, + Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds) { return new KafkaSpoutWrapper(null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java index 8c7c8d6..a502f81 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java @@ -31,6 +31,7 @@ import org.apache.eagle.app.Application; import org.apache.eagle.app.environment.ExecutionRuntime; import org.apache.eagle.app.environment.ExecutionRuntimeManager; import org.apache.eagle.app.messaging.KafkaStreamSinkConfig; +import org.apache.eagle.app.messaging.KafkaStreamSourceConfig; import org.apache.eagle.metadata.model.ApplicationEntity; import org.apache.eagle.metadata.model.StreamSourceConfig; import org.apache.eagle.metadata.utils.StreamIdConversions; @@ -144,6 +145,15 @@ public class ApplicationAction implements Serializable { datasource.setName(metadata.getAppId()); datasource.setTopic(kafkaCfg.getTopicId()); datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); + datasource.setProperties(new HashMap<>()); + + KafkaStreamSourceConfig streamSourceConfig = (KafkaStreamSourceConfig) streamDesc.getSourceConfig(); + if (streamSourceConfig != null) { + Map<String, String> properties = datasource.getProperties(); + properties.put(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH, streamSourceConfig.getBrokerZkPath()); + properties.put(AlertConstants.KAFKA_BROKER_ZK_QUORUM, streamSourceConfig.getBrokerZkQuorum()); + } + Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata(); Properties prop = new Properties(); prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-server-assembly/src/main/bin/metadata-ddl.sql ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/bin/metadata-ddl.sql b/eagle-server-assembly/src/main/bin/metadata-ddl.sql index f80a7ea..4bed927 100644 --- a/eagle-server-assembly/src/main/bin/metadata-ddl.sql +++ b/eagle-server-assembly/src/main/bin/metadata-ddl.sql @@ -46,21 +46,21 @@ CREATE TABLE IF NOT EXISTS sites ( -- eagle security module metadata CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity ( - site varchar(20) DEFAULT NULL, - filedir varchar(100) DEFAULT NULL, + site varchar(20) NOT NULL, + filedir varchar(100) NOT NULL, sensitivity_type varchar(20) DEFAULT NULL, primary key (site, filedir) ); CREATE TABLE IF NOT EXISTS ip_securityzone ( - iphost varchar(100) DEFAULT NULL, + iphost varchar(100) NOT NULL, security_zone varchar(100) DEFAULT NULL, primary key (iphost) ); CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity ( - site varchar(20) DEFAULT NULL, - hbase_resource varchar(100) DEFAULT NULL, + site varchar(20) NOT NULL, + hbase_resource varchar(100) NOT NULL, sensitivity_type varchar(20) DEFAULT NULL, primary key (site, hbase_resource) );