Repository: incubator-eagle Updated Branches: refs/heads/master 4250e2d32 -> 4ff963b47
[EAGLE-693] fix application could not detect stream change Author: wujinhu <wujinhu...@126.com> Closes #580 from wujinhu/EAGLE-700. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4ff963b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4ff963b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4ff963b4 Branch: refs/heads/master Commit: 4ff963b47fd82cb1ca8394027518a9183f39176e Parents: 4250e2d Author: wujinhu <wujinhu...@126.com> Authored: Sat Oct 29 22:20:20 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Sat Oct 29 22:20:20 2016 +0800 ---------------------------------------------------------------------- .../eagle/app/service/ApplicationAction.java | 81 +++++++++++--------- .../src/main/bin/createTables.sql | 20 +++++ 2 files changed, 65 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ff963b4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java index b7258d3..bd0adfe 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java @@ -88,45 +88,51 @@ public class ApplicationAction implements Serializable { } public void doInstall() { - if (metadata.getDescriptor().getStreams() != null) { - List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> { - StreamDefinition copied = streamDefinition.copy(); - copied.setSiteId(metadata.getSite().getSiteId()); - copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId())); - StreamSinkConfig streamSinkConfig = this.runtime.environment() + processStreams(); + } + + private void processStreams() { + if (metadata.getDescriptor().getStreams() == null) { + return; + } + + List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> { + StreamDefinition copied = streamDefinition.copy(); + copied.setSiteId(metadata.getSite().getSiteId()); + copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId())); + StreamSinkConfig streamSinkConfig = this.runtime.environment() .streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig); - StreamDesc streamDesc = new StreamDesc(); - streamDesc.setSchema(copied); - streamDesc.setSink(streamSinkConfig); - streamDesc.setStreamId(copied.getStreamId()); - return streamDesc; - })).collect(Collectors.toList()); - metadata.setStreams(streamDescToInstall); + StreamDesc streamDesc = new StreamDesc(); + streamDesc.setSchema(copied); + streamDesc.setSink(streamSinkConfig); + streamDesc.setStreamId(copied.getStreamId()); + return streamDesc; + })).collect(Collectors.toList()); + metadata.setStreams(streamDescToInstall); - // TODO: Decouple converting from StreamSink to Alert DataSource - // iterate each stream descriptor and create alert datasource for each - for (StreamDesc streamDesc : streamDescToInstall) { - // only take care of Kafka sink - if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) { - KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink(); - Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); - datasource.setType("KAFKA"); - datasource.setName(metadata.getAppId()); - datasource.setTopic(kafkaCfg.getTopicId()); - datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); - Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata(); - Properties prop = new Properties(); - prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId()); - tuple2Stream.setStreamNameSelectorProp(prop); - tuple2Stream.setTimestampColumn("timestamp"); - tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName()); - datasource.setCodec(tuple2Stream); - alertMetadataService.addDataSource(datasource); + // TODO: Decouple converting from StreamSink to Alert DataSource + // iterate each stream descriptor and create alert datasource for each + for (StreamDesc streamDesc : streamDescToInstall) { + // only take care of Kafka sink + if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) { + KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink(); + Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); + datasource.setType("KAFKA"); + datasource.setName(metadata.getAppId()); + datasource.setTopic(kafkaCfg.getTopicId()); + datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); + Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata(); + Properties prop = new Properties(); + prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId()); + tuple2Stream.setStreamNameSelectorProp(prop); + tuple2Stream.setTimestampColumn("timestamp"); + tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName()); + datasource.setCodec(tuple2Stream); + alertMetadataService.addDataSource(datasource); - StreamDefinition sd = streamDesc.getSchema(); - sd.setDataSource(metadata.getAppId()); - alertMetadataService.createStream(streamDesc.getSchema()); - } + StreamDefinition sd = streamDesc.getSchema(); + sd.setDataSource(metadata.getAppId()); + alertMetadataService.createStream(streamDesc.getSchema()); } } } @@ -144,6 +150,9 @@ public class ApplicationAction implements Serializable { } public void doStart() { + if (metadata.getStreams() == null) { + processStreams(); + } this.runtime.start(this.application, this.effectiveConfig); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ff963b4/eagle-server-assembly/src/main/bin/createTables.sql ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/bin/createTables.sql b/eagle-server-assembly/src/main/bin/createTables.sql index 749032b..da67d3d 100644 --- a/eagle-server-assembly/src/main/bin/createTables.sql +++ b/eagle-server-assembly/src/main/bin/createTables.sql @@ -38,4 +38,24 @@ CREATE TABLE IF NOT EXISTS sites ( createdtime bigint(20) DEFAULT NULL, modifiedtime bigint(20) DEFAULT NULL, UNIQUE (siteid) +); + +CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity ( + site varchar(20) DEFAULT NULL, + filedir varchar(100) DEFAULT NULL, + sensitivity_type varchar(20) DEFAULT NULL, + primary key (site, filedir) +); + +CREATE TABLE IF NOT EXISTS ip_securityzone ( + iphost varchar(100) DEFAULT NULL, + security_zone varchar(100) DEFAULT NULL, + primary key (iphost) +); + +CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity ( + site varchar(20) DEFAULT NULL, + hbase_resource varchar(100) DEFAULT NULL, + sensitivity_type varchar(20) DEFAULT NULL, + primary key (site, hbase_resource) ); \ No newline at end of file