http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java deleted file mode 100644 index 8f6cbc6..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.provider; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import java.util.HashMap; -import java.util.Map; - -/** - * @since Mar 28, 2016. - */ -public class InMemScheduleConext implements IScheduleContext { - - private Map<String, Topology> topologies = new HashMap<String, Topology>(); - private Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>(); - private Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>(); - private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, Kafka2TupleMetadata>(); - private Map<String, PolicyAssignment> policyAssignments = new HashMap<String, PolicyAssignment>(); - private Map<String, StreamDefinition> schemas = new HashMap<String, StreamDefinition>(); - private Map<StreamGroup, MonitoredStream> monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(); - private Map<String, Publishment> publishments = new HashMap<String, Publishment>(); - - public InMemScheduleConext() { - } - - public InMemScheduleConext(IScheduleContext context) { - this.topologies = new HashMap<String, Topology>(context.getTopologies()); - this.usages = new HashMap<String, TopologyUsage>(context.getTopologyUsages()); - this.policies = new HashMap<String, PolicyDefinition>(context.getPolicies()); - this.datasources = new HashMap<String, Kafka2TupleMetadata>(context.getDataSourceMetadata()); - this.policyAssignments = new HashMap<String, PolicyAssignment>(context.getPolicyAssignments()); - this.schemas = new HashMap<String, StreamDefinition>(context.getStreamSchemas()); - this.monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(context.getMonitoredStreams()); - this.publishments = new HashMap<String, Publishment>(context.getPublishments()); - } - - public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments, - Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2, - Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions, - Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) { - this.topologies = topologies2; - this.policyAssignments = assignments; - this.datasources = kafkaSources; - this.policies = policies2; - this.publishments = publishments2; - this.schemas = streamDefinitions; - this.monitoredStreams = monitoredStreamMap; - this.usages = usages2; - } - - public Map<String, Topology> getTopologies() { - return topologies; - } - - public void addTopology(Topology topo) { - topologies.put(topo.getName(), topo); - } - - public Map<String, TopologyUsage> getTopologyUsages() { - return usages; - } - - public void addTopologyUsages(TopologyUsage usage) { - usages.put(usage.getTopoName(), usage); - } - - public Map<String, PolicyDefinition> getPolicies() { - return policies; - } - - public void addPoilcy(PolicyDefinition pd) { - this.policies.put(pd.getName(), pd); - } - - public Map<String, Kafka2TupleMetadata> getDatasources() { - return datasources; - } - - public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) { - this.datasources = datasources; - } - - public void addDataSource(Kafka2TupleMetadata dataSource) { - this.datasources.put(dataSource.getName(), dataSource); - } - - @Override - public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() { - return datasources; - } - - public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> policyAssignments) { - this.policyAssignments = policyAssignments; - } - - public Map<String, PolicyAssignment> getPolicyAssignments() { - return this.policyAssignments; - } - - @Override - public Map<String, StreamDefinition> getStreamSchemas() { - return schemas; - } - - public void addSchema(StreamDefinition schema) { - this.schemas.put(schema.getStreamId(), schema); - } - - public void setStreamSchemas(Map<String, StreamDefinition> schemas) { - this.schemas = schemas; - } - - @Override - public Map<StreamGroup, MonitoredStream> getMonitoredStreams() { - return monitoredStreams; - } - - @Override - public Map<String, Publishment> getPublishments() { - return publishments; - } - - public void addPublishment(Publishment pub) { - this.publishments.put(pub.getName(), pub); - } - -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java deleted file mode 100644 index 7f5dcdc..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.provider; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.utils.TimePeriodUtils; -import com.typesafe.config.Config; -import org.apache.commons.lang3.StringUtils; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class NodataMetadataGenerator { - - private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGenerator.class); - - private static final String NODATA_ALERT_AGGR_STREAM = "nodata_alert_aggregation_stream"; - private static final String NODATA_ALERT_AGGR_OUTPUT_STREAM = "nodata_alert_aggregation_output_stream"; - private static final String NODATA_ALERT_AGGR_DATASOURCE_NAME = "nodata_alert_aggregation_ds"; - private static final String NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME = "nodata_alert_aggregation_output_ds"; - private static final String NODATA_ALERT_AGGR_TOPIC_NAME = "nodata_alert_aggregation"; - private static final String NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME = "nodata_alert"; - - private static final String DATASOURCE_TYPE = "KAFKA"; - private static final String DATASOURCE_SCHEME_CLS = "org.apache.eagle.alert.engine.scheme.JsonScheme"; - - private static final String NODATA_ALERT_AGGR_POLICY_TYPE = "nodataalert"; - private static final String NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE = "siddhi"; - - private static final String JSON_STRING_STREAM_NAME_SELECTOR_CLS = "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector"; - private static final String STREAM_TIMESTAMP_COLUMN_NAME = "timestamp"; - private static final String STREAM_TIMESTAMP_FORMAT = ""; - - private static final String KAFKA_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher"; - private static final String EMAIL_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher"; - - private static final String PUBLISHMENT_DEDUP_DURATION = "PT0M"; - private static final String PUBLISHMENT_SERIALIZER = "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"; - - public NodataMetadataGenerator() { - } - - public void execute(Config config, Map<String, StreamDefinition> streamDefinitionsMap, - Map<String, Kafka2TupleMetadata> kafkaSources, - Map<String, PolicyDefinition> policies, Map<String, Publishment> publishments) { - Collection<StreamDefinition> streamDefinitions = streamDefinitionsMap.values(); - for (StreamDefinition streamDefinition : streamDefinitions) { - StreamColumn columnWithNodataExpression = null; - for (StreamColumn column : streamDefinition.getColumns()) { - if (StringUtils.isNotBlank(column.getNodataExpression())) { - // has nodata alert setting, needs to generate the nodata alert policy - if (columnWithNodataExpression != null) { - columnWithNodataExpression = null; - LOG.warn("Only one column in one stream is allowed to configure nodata alert"); - break; - } - columnWithNodataExpression = column; - } - } - if (columnWithNodataExpression != null) { - final String streamName = streamDefinition.getStreamId(); - - // create nodata alert aggr stream - if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_STREAM)) { - LOG.info("Nodata alert aggregation stream: {} already exists", NODATA_ALERT_AGGR_STREAM); - } else { - streamDefinitionsMap.put(NODATA_ALERT_AGGR_STREAM, buildAggregationStream()); - LOG.info("Created nodata alert aggregation stream: {}", NODATA_ALERT_AGGR_STREAM); - } - - // create nodata alert aggr output stream - if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_OUTPUT_STREAM)) { - LOG.info("Nodata alert aggregation output stream: {} already exists", NODATA_ALERT_AGGR_OUTPUT_STREAM); - } else { - streamDefinitionsMap.put(NODATA_ALERT_AGGR_OUTPUT_STREAM, buildAggregationOutputStream()); - LOG.info("Created nodata alert aggregation output stream: {}", NODATA_ALERT_AGGR_OUTPUT_STREAM); - } - - // create nodata alert data source - if (kafkaSources.containsKey(NODATA_ALERT_AGGR_DATASOURCE_NAME)) { - LOG.info("Stream: {} nodata alert aggregation datasource: {} already exists", - NODATA_ALERT_AGGR_STREAM, NODATA_ALERT_AGGR_DATASOURCE_NAME); - } else { - kafkaSources.put(NODATA_ALERT_AGGR_DATASOURCE_NAME, buildAggregationDatasource()); - LOG.info("Created nodata alert aggregation datasource {} for stream {}", - NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_STREAM); - } - - // create nodata alert aggregation output datasource - if (kafkaSources.containsKey(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME)) { - LOG.info("Stream: {} nodata alert aggregation output datasource: {} already exists", - NODATA_ALERT_AGGR_OUTPUT_STREAM, NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME); - } else { - kafkaSources.put(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME, buildAggregationOutputDatasource()); - LOG.info("Created nodata alert aggregation output datasource {} for stream {}", - NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_OUTPUT_STREAM); - } - - // create nodata alert policy - String policyName = streamName + "_nodata_alert"; - String nodataExpression = columnWithNodataExpression.getNodataExpression(); - String[] segments = nodataExpression.split(","); - long windowPeriodInSeconds = TimePeriodUtils.getSecondsOfPeriod(Period.parse(segments[0])); - if (policies.containsKey(policyName)) { - LOG.info("Stream: {} nodata alert policy: {} already exists", streamName, policyName); - } else { - policies.put(policyName, buildDynamicNodataPolicy( - streamName, - policyName, - columnWithNodataExpression.getName(), - nodataExpression, - Arrays.asList(streamName))); - LOG.info("Created nodata alert policy {} with expression {} for stream {}", - policyName, nodataExpression, streamName); - } - - // create nodata alert aggregation - String aggrPolicyName = NODATA_ALERT_AGGR_STREAM + "_policy"; - if (policies.containsKey(aggrPolicyName)) { - LOG.info("Stream: {} nodata alert aggregation policy: {} already exists", - NODATA_ALERT_AGGR_OUTPUT_STREAM, aggrPolicyName); - } else { - policies.put(aggrPolicyName, buildAggregationPolicy( - aggrPolicyName, - columnWithNodataExpression.getName(), - windowPeriodInSeconds)); - LOG.info("Created nodata alert aggregation policy {} for stream {}", - aggrPolicyName, NODATA_ALERT_AGGR_OUTPUT_STREAM); - } - - // create nodata alert publish - String publishmentName = policyName + "_publish"; - if (publishments.containsKey(publishmentName)) { - LOG.info("Stream: {} nodata alert publishment: {} already exists", streamName, publishmentName); - } else { - String kafkaBroker = config.getString("kafkaProducer.bootstrapServers"); - publishments.put(publishmentName, buildKafkaAlertPublishment( - publishmentName, policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME)); - publishments.put(publishmentName + "_email", buildEmailAlertPublishment(config, - publishmentName + "_email", policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME)); - LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName); - } - - // create nodata alert aggregation publish - String aggrPublishName = aggrPolicyName + "_publish"; - if (publishments.containsKey(aggrPublishName)) { - LOG.info("Stream: {} publishment: {} already exists", NODATA_ALERT_AGGR_STREAM, aggrPublishName); - } else { - String kafkaBroker = config.getString("kafkaProducer.bootstrapServers"); - publishments.put(aggrPublishName, buildKafkaAlertPublishment( - aggrPublishName, aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME)); - publishments.put(aggrPublishName + "_email", buildEmailAlertPublishment(config, - aggrPublishName + "_email", aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME)); - LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName); - } - } - } - } - - private Kafka2TupleMetadata buildAggregationDatasource() { - Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); - datasource.setName(NODATA_ALERT_AGGR_DATASOURCE_NAME); - datasource.setType(DATASOURCE_TYPE); - datasource.setSchemeCls(DATASOURCE_SCHEME_CLS); - datasource.setTopic(NODATA_ALERT_AGGR_TOPIC_NAME); - Tuple2StreamMetadata codec = new Tuple2StreamMetadata(); - codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS); - codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME); - codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT); - Properties codecProperties = new Properties(); - codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_STREAM); - codecProperties.put("streamNameFormat", "%s"); - codec.setStreamNameSelectorProp(codecProperties); - datasource.setCodec(codec); - return datasource; - } - - private Kafka2TupleMetadata buildAggregationOutputDatasource() { - Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); - datasource.setName(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME); - datasource.setType(DATASOURCE_TYPE); - datasource.setSchemeCls(DATASOURCE_SCHEME_CLS); - datasource.setTopic(NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME); - Tuple2StreamMetadata codec = new Tuple2StreamMetadata(); - codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS); - codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME); - codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT); - Properties codecProperties = new Properties(); - codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_OUTPUT_STREAM); - codecProperties.put("streamNameFormat", "%s"); - codec.setStreamNameSelectorProp(codecProperties); - datasource.setCodec(codec); - return datasource; - } - - private PolicyDefinition buildDynamicNodataPolicy(String streamName, String policyName, - String columnName, String expression, List<String> inputStream) { - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - //expression, something like "PT5S,dynamic,1,host" - def.setValue(expression); - def.setType(NODATA_ALERT_AGGR_POLICY_TYPE); - Map<String, Object> properties = new HashMap<String, Object>(); - properties.put("nodataColumnName", columnName); - def.setProperties(properties); - PolicyDefinition pd = new PolicyDefinition(); - pd.setDefinition(def); - pd.setInputStreams(inputStream); - pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM)); - pd.setName(policyName); - pd.setDescription(String.format("Nodata alert policy for stream %s", streamName)); - - StreamPartition sp = new StreamPartition(); - sp.setStreamId(streamName); - sp.setColumns(Arrays.asList(columnName)); - sp.setType(StreamPartition.Type.GROUPBY); - pd.addPartition(sp); - return pd; - } - - private PolicyDefinition buildAggregationPolicy(String policyName, String columnName, - long windowPeriodInSeconds) { - final PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - String siddhiQL = String.format( - "from %s#window.timeBatch(%s sec) select eagle:collectWithDistinct(%s) as hosts, " - + "originalStreamName as streamName group by originalStreamName insert into %s", - NODATA_ALERT_AGGR_STREAM, windowPeriodInSeconds * 2, - columnName, NODATA_ALERT_AGGR_OUTPUT_STREAM); - LOG.info("Generated SiddhiQL {} for stream: {}", siddhiQL, NODATA_ALERT_AGGR_STREAM); - def.setValue(siddhiQL); - def.setType(NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM)); - pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_OUTPUT_STREAM)); - pd.setName(policyName); - pd.setDescription("Nodata alert aggregation policy, used to merge alerts from multiple bolts"); - - StreamPartition sp = new StreamPartition(); - sp.setStreamId(NODATA_ALERT_AGGR_STREAM); - sp.setColumns(Arrays.asList(columnName)); - sp.setType(StreamPartition.Type.GROUPBY); - pd.addPartition(sp); - pd.setParallelismHint(0); - return pd; - } - - private Publishment buildKafkaAlertPublishment(String publishmentName, String policyName, String kafkaBroker, String topic) { - Publishment publishment = new Publishment(); - publishment.setName(publishmentName); - publishment.setType(KAFKA_PUBLISHMENT_TYPE); - publishment.setPolicyIds(Arrays.asList(policyName)); - publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION); - Map<String, Object> publishmentProperties = new HashMap<>(); - publishmentProperties.put("kafka_broker", kafkaBroker); - publishmentProperties.put("topic", topic); - publishment.setProperties(publishmentProperties); - publishment.setSerializer(PUBLISHMENT_SERIALIZER); - return publishment; - } - - private Publishment buildEmailAlertPublishment(Config config, - String publishmentName, String policyName, String kafkaBroker, String topic) { - Publishment publishment = new Publishment(); - publishment.setName(publishmentName); - publishment.setType(EMAIL_PUBLISHMENT_TYPE); - publishment.setPolicyIds(Arrays.asList(policyName)); - publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION); - Map<String, Object> publishmentProperties = new HashMap<>(); - publishmentProperties.put("subject", String.format("Eagle Alert - %s", topic)); - publishmentProperties.put("template", ""); - publishmentProperties.put("sender", config.getString("email.sender")); - publishmentProperties.put("recipients", config.getString("email.recipients")); - publishmentProperties.put("mail.smtp.host", config.getString("email.mailSmtpHost")); - publishmentProperties.put("mail.smtp.port", config.getString("email.mailSmtpPort")); - publishmentProperties.put("connection", "plaintext"); - publishment.setProperties(publishmentProperties); - publishment.setSerializer(PUBLISHMENT_SERIALIZER); - return publishment; - } - - private StreamDefinition buildAggregationStream() { - final StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn originalStreamNameColumn = new StreamColumn(); - originalStreamNameColumn.setName("originalStreamName"); - originalStreamNameColumn.setType(StreamColumn.Type.STRING); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, originalStreamNameColumn)); - sd.setDataSource(NODATA_ALERT_AGGR_DATASOURCE_NAME); - sd.setStreamId(NODATA_ALERT_AGGR_STREAM); - sd.setDescription("Nodata alert aggregation stream"); - return sd; - } - - private StreamDefinition buildAggregationOutputStream() { - final StreamDefinition sd = new StreamDefinition(); - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("hosts"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn osnColumn = new StreamColumn(); - osnColumn.setName("streamName"); - osnColumn.setType(StreamColumn.Type.STRING); - - sd.setColumns(Arrays.asList(hostColumn, osnColumn)); - sd.setDataSource(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME); - sd.setStreamId(NODATA_ALERT_AGGR_OUTPUT_STREAM); - sd.setDescription("Nodata alert aggregation output stream"); - return sd; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java deleted file mode 100644 index b8e3824..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java +++ /dev/null @@ -1,432 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.provider; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.*; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.GroupBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.PolicyStatus; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.stream.Collectors; - -/** - * FIXME: this class focus on correctness, not the efficiency now. There might - * be problem when metadata size grows too big. - * - * @since May 3, 2016 - */ -public class ScheduleContextBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class); - private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname - - private Config config; - private IMetadataServiceClient client; - - private Map<String, Topology> topologies; - private Map<String, PolicyAssignment> assignments; - private Map<String, Kafka2TupleMetadata> kafkaSources; - private Map<String, PolicyDefinition> policies; - private Map<String, Publishment> publishments; - private Map<String, StreamDefinition> streamDefinitions; - private Map<StreamGroup, MonitoredStream> monitoredStreamMap; - private Map<String, TopologyUsage> usages; - private IScheduleContext builtContext; - - public ScheduleContextBuilder(Config config) { - this.config = config; - client = new MetadataServiceClientImpl(config); - } - - public ScheduleContextBuilder(Config config, IMetadataServiceClient client) { - this.config = config; - this.client = client; - } - - /** - * Built a shcedule context for metadata client service. - * - * @return - */ - public IScheduleContext buildContext() { - topologies = listToMap(client.listTopologies()); - kafkaSources = listToMap(client.listDataSources()); - // filter out disabled policies - List<PolicyDefinition> enabledPolicies = client.listPolicies().stream().filter( - (t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList()); - policies = listToMap(enabledPolicies); - publishments = listToMap(client.listPublishment()); - streamDefinitions = listToMap(client.listStreams()); - // generate data sources, policies, publishments for nodata alert - new NodataMetadataGenerator().execute(config, streamDefinitions, kafkaSources, policies, publishments); - - // TODO: See ScheduleState comments on how to improve the storage - ScheduleState state = client.getVersionedSpec(); - - // detect policy update, remove the policy assignments. - // definition change : the assignment would NOT change, the runtime will do reload and check - // stream change : the assignment would NOT change, the runtime will do reload and check - // data source change : the assignment would NOT change, the runtime will do reload and check - // parallelism change : the policies' assignment would be dropped when it's bigger than assign queue, and expect - // to be assigned in scheduler. - assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : detectAssignmentsChange(state.getAssignments(), state)); - - monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : detectMonitoredStreams(state.getMonitoredStreams())); - - // build based on existing data - usages = buildTopologyUsage(); - - // copy to shedule context now - builtContext = new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments, - streamDefinitions, monitoredStreamMap, usages); - return builtContext; - } - - public IScheduleContext getBuiltContext() { - return builtContext; - } - - /** - * 1. - * <pre> - * Check for deprecated policy stream group with its assigned monitored stream groups. - * If this is unmatched, we think the policy' stream group has been changed, remove the policy assignments - * If finally, no assignment refer to a given monitored stream, this monitored stream could be removed. - * Log when every time a remove happens. - * </pre> - * 2. - * <pre> - * if monitored stream's queue's is on non-existing topology, remove it. - * </pre> - * - * @param monitoredStreams - * @return - */ - private List<MonitoredStream> detectMonitoredStreams(List<MonitoredStream> monitoredStreams) { - List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams); - - // clear deprecated streams - clearMonitoredStreams(monitoredStreams); - - // build queueId-> streamGroup - Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, StreamGroup>(); - for (MonitoredStream ms : result) { - for (StreamWorkSlotQueue q : ms.getQueues()) { - queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup()); - } - } - - // decide the assignment delete set - Set<StreamGroup> usedGroups = new HashSet<StreamGroup>(); - Set<String> toRemove = new HashSet<String>(); - // check if queue is still referenced by policy assignments - for (PolicyAssignment assignment : assignments.values()) { - PolicyDefinition def = policies.get(assignment.getPolicyName()); - StreamGroup group = queue2StreamGroup.get(assignment.getQueueId()); - if (group == null || !Objects.equals(group.getStreamPartitions(), def.getPartitionSpec())) { - LOG.warn(" policy assgiment {} 's policy group {} is different to the monitored stream's partition group {}, " - + "this indicates a policy stream partition spec change, the assignment would be removed! ", - assignment, def.getPartitionSpec(), group == null ? "'not found'" : group.getStreamPartitions()); - toRemove.add(assignment.getPolicyName()); - } else { - usedGroups.add(group); - } - } - - // remove useless - assignments.keySet().removeAll(toRemove); - // remove non-referenced monitored streams - result.removeIf((t) -> { - boolean used = usedGroups.contains(t.getStreamGroup()); - if (!used) { - LOG.warn("monitor stream with stream group {} is not referenced, " - + "this monitored stream and its worker queu will be removed", t.getStreamGroup()); - } - return !used; - }); - - return result; - } - - private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) { - Iterator<MonitoredStream> it = monitoredStreams.iterator(); - while (it.hasNext()) { - MonitoredStream ms = it.next(); - Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator(); - Set<String> usedQueueSet = new HashSet<>(); - assignments.values().stream().forEach(assignment -> usedQueueSet.add(assignment.getQueueId())); - - // clean queues that underlying topology is changed(removed/down) - // clear queues that are no longer used - while (queueIt.hasNext()) { - StreamWorkSlotQueue queue = queueIt.next(); - boolean deprecated = false; - for (WorkSlot ws : queue.getWorkingSlots()) { - // check if topology available or bolt available - if (!topologies.containsKey(ws.topologyName) - || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) { - deprecated = true; - break; - } - } - if (deprecated || !usedQueueSet.contains(queue.getQueueId())) { - queueIt.remove(); - } - } - - if (ms.getQueues().isEmpty()) { - it.remove(); - } - } - } - - private List<PolicyAssignment> detectAssignmentsChange(List<PolicyAssignment> list, ScheduleState state) { - // FIXME: duplicated build map ? - Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>(); - for (MonitoredStream ms : state.getMonitoredStreams()) { - for (StreamWorkSlotQueue q : ms.getQueues()) { - queueMap.put(q.getQueueId(), q); - } - } - - List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list); - Iterator<PolicyAssignment> paIt = result.iterator(); - while (paIt.hasNext()) { - PolicyAssignment assignment = paIt.next(); - - if (!policies.containsKey(assignment.getPolicyName())) { - LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment); - paIt.remove(); - } else { - StreamWorkSlotQueue queue = queueMap.get(assignment.getQueueId()); - if (queue == null - || policies.get(assignment.getPolicyName()).getParallelismHint() != queue.getQueueSize()) { - // queue not found or policy has hint not equal to queue (possible a poilcy update) - LOG.info("Policy assignment {} 's policy doesnt match queue: {}!", assignment, queue); - paIt.remove(); - } - } - } - return result; - } - - public static <T, K> Map<K, T> listToMap(List<T> collections) { - Map<K, T> maps = new HashMap<K, T>(collections.size()); - for (T t : collections) { - maps.put(getKey(t), t); - } - return maps; - } - - /* - * One drawback, once we add class, this code need to be changed! - */ - @SuppressWarnings("unchecked") - private static <T, K> K getKey(T t) { - if (t instanceof Topology) { - return (K) ((Topology) t).getName(); - } else if (t instanceof PolicyAssignment) { - return (K) ((PolicyAssignment) t).getPolicyName(); - } else if (t instanceof Kafka2TupleMetadata) { - return (K) ((Kafka2TupleMetadata) t).getName(); - } else if (t instanceof PolicyDefinition) { - return (K) ((PolicyDefinition) t).getName(); - } else if (t instanceof Publishment) { - return (K) ((Publishment) t).getName(); - } else if (t instanceof StreamDefinition) { - return (K) ((StreamDefinition) t).getStreamId(); - } else if (t instanceof MonitoredStream) { - return (K) ((MonitoredStream) t).getStreamGroup(); - } - throw new RuntimeException("unexpected key class " + t.getClass()); - } - - private Map<String, TopologyUsage> buildTopologyUsage() { - Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>(); - - // pre-build data structure for help - Map<String, Set<MonitoredStream>> topo2MonitorStream = new HashMap<String, Set<MonitoredStream>>(); - Map<String, Set<String>> topo2Policies = new HashMap<String, Set<String>>(); - // simply assume no bolt with the same id - Map<String, Set<String>> bolt2Policies = new HashMap<String, Set<String>>(); - // simply assume no bolt with the same id - Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, Set<StreamGroup>>(); - // simply assume no bolt with the same id - Map<String, Set<String>> bolt2QueueIds = new HashMap<String, Set<String>>(); - Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>(); - - preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap); - - for (Topology t : topologies.values()) { - TopologyUsage u = new TopologyUsage(t.getName()); - // add group/bolt usages - for (String grpBolt : t.getGroupNodeIds()) { - GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt); - u.getGroupUsages().put(grpBolt, grpUsage); - } - for (String alertBolt : t.getAlertBoltIds()) { - String uniqueBoltId = String.format(UNIQUE_BOLT_ID, t.getName(), alertBolt); - - AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt); - u.getAlertUsages().put(alertBolt, alertUsage); - // complete usage - addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, uniqueBoltId, alertUsage, queueMap); - } - - // policy -- policy assignment - if (topo2Policies.containsKey(u.getTopoName())) { - u.getPolicies().addAll(topo2Policies.get(u.getTopoName())); - } - - // data source - buildTopologyDataSource(u); - - // topology usage monitored stream -- from monitored steams' queue slot item info - if (topo2MonitorStream.containsKey(u.getTopoName())) { - u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName())); - } - - usages.put(u.getTopoName(), u); - } - - return usages; - } - - private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies, - Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt, - AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) { - // - if (bolt2Policies.containsKey(uniqueAlertBolt)) { - alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt)); - } - // - if (bolt2Partition.containsKey(uniqueAlertBolt)) { - alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt)); - } - // - if (bolt2QueueIds.containsKey(uniqueAlertBolt)) { - for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) { - if (queueMap.containsKey(qId)) { - alertUsage.getReferQueues().add(queueMap.get(qId)); - } else { - LOG.error(" queue id {} not found in queue map !", qId); - } - } - } - } - - private void buildTopologyDataSource(TopologyUsage u) { - for (String policyName : u.getPolicies()) { - PolicyDefinition def = policies.get(policyName); - if (def != null) { - u.getDataSources().addAll(findDatasource(def)); - } else { - LOG.error(" policy not find {}, but reference in topology usage {} !", policyName, u.getTopoName()); - } - } - } - - private List<String> findDatasource(PolicyDefinition def) { - List<String> result = new ArrayList<String>(); - List<String> inputStreams = def.getInputStreams(); - for (String is : inputStreams) { - StreamDefinition ss = this.streamDefinitions.get(is); - if (ss == null) { - LOG.error("policy {} referenced stream definition {} not found in definiton !", def.getName(), is); - } else { - result.add(ss.getDataSource()); - } - } - return result; - } - - private void preBuildQueue2TopoMap( - Map<String, Set<MonitoredStream>> topo2MonitorStream, - Map<String, Set<String>> topo2Policies, - Map<String, Set<String>> bolt2Policies, - Map<String, Set<StreamGroup>> bolt2Partition, - Map<String, Set<String>> bolt2QueueIds, - Map<String, StreamWorkSlotQueue> queueMap) { - // pre-build structure - // why don't reuse the queue.getPolicies - Map<String, Set<String>> queue2Policies = new HashMap<String, Set<String>>(); - for (PolicyAssignment pa : assignments.values()) { - if (!queue2Policies.containsKey(pa.getQueueId())) { - queue2Policies.put(pa.getQueueId(), new HashSet<String>()); - } - queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName()); - } - - for (MonitoredStream stream : monitoredStreamMap.values()) { - for (StreamWorkSlotQueue q : stream.getQueues()) { - queueMap.put(q.getQueueId(), q); - Set<String> policiesOnQ = queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) : new HashSet<String>(); - - for (WorkSlot slot : q.getWorkingSlots()) { - // topo2monitoredstream - if (!topo2MonitorStream.containsKey(slot.getTopologyName())) { - topo2MonitorStream.put(slot.getTopologyName(), new HashSet<MonitoredStream>()); - } - topo2MonitorStream.get(slot.getTopologyName()).add(stream); - - // topo2policy - if (!topo2Policies.containsKey(slot.getTopologyName())) { - topo2Policies.put(slot.getTopologyName(), new HashSet<String>()); - } - topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ); - - // bolt2Policy - if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) { - bolt2Policies.put(getUniqueBoltId(slot), new HashSet<String>()); - } - bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ); - - // bolt2Queue - if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) { - bolt2QueueIds.put(getUniqueBoltId(slot), new HashSet<String>()); - } - bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId()); - - // bolt2Partition - if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) { - bolt2Partition.put(getUniqueBoltId(slot), new HashSet<StreamGroup>()); - } - bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup()); - } - } - } - } - - private String getUniqueBoltId(WorkSlot slot) { - return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), slot.getBoltId()); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java deleted file mode 100644 index 9d83c4a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.resource; - -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.Coordinator; -import org.apache.eagle.alert.coordinator.ScheduleOption; -import org.apache.eagle.alert.coordinator.ValidateState; -import org.apache.eagle.alert.utils.JsonUtils; - -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; - -/** - * This is to provide API access even we don't have ZK as intermediate access. - * FIXME : more elogant status code - * - * @since Mar 24, 2016 <br/> - */ -@Path("/coordinator") -@Produces( {"application/json"}) -public class CoordinatorResource { - - // sprint config here? - private Coordinator alertCoordinator = new Coordinator(); - - @GET - @Path("/assignments") - public String getAssignments() throws Exception { - ScheduleState state = alertCoordinator.getState(); - return JsonUtils.writeValueAsString(state); - } - - @POST - @Path("/build") - public String build() throws Exception { - ScheduleOption option = new ScheduleOption(); - ScheduleState state = alertCoordinator.schedule(option); - return JsonUtils.writeValueAsString(state); - } - - @POST - @Path("/validate") - public String validate() throws Exception { - ValidateState state = alertCoordinator.validate(); - return JsonUtils.writeValueAsString(state); - } - - @POST - @Path("/enablePeriodicForceBuild") - public void enforcePeriodicallyBuild() { - alertCoordinator.enforcePeriodicallyBuild(); - } - - @POST - @Path("/disablePeriodicForceBuild") - public void disablePeriodicallyBuild() { - alertCoordinator.disablePeriodicallyBuild(); - } - - @SuppressWarnings("static-access") - @GET - @Path("/periodicForceBuildState") - public boolean statPeriodicallyBuild() { - return alertCoordinator.isPeriodicallyForceBuildEnable(); - } - - /** - * Manually update the topology usages, for administration. - */ - @POST - @Path("/refreshUsages") - public String refreshUsages() { - // TODO - return ""; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java deleted file mode 100644 index 454f47c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.trigger; - -import com.google.common.base.Stopwatch; -import com.typesafe.config.Config; -import org.apache.eagle.alert.config.ConfigBusProducer; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.*; -import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * @since Jun 27, 2016. - */ -public class CoordinatorTrigger implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorTrigger.class); - - private Config config; - private IMetadataServiceClient client; - - public CoordinatorTrigger(Config config, IMetadataServiceClient client) { - this.config = config; - this.client = client; - } - - @Override - public void run() { - if (Coordinator.isPeriodicallyForceBuildEnable()) { - LOG.info("CoordinatorTrigger started ... "); - - Stopwatch watch = Stopwatch.createStarted(); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - try (ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig)) { - executor.execute(Coordinator.GREEDY_SCHEDULER_ZK_PATH, () -> { - // schedule - IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext(); - TopologyMgmtService mgmtService = new TopologyMgmtService(); - IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler(); - - scheduler.init(context, mgmtService); - - ScheduleState state = scheduler.schedule(new ScheduleOption()); - - // use try catch to use AutoCloseable interface to close producer automatically - try (ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) { - Coordinator.postSchedule(client, state, producer); - } - - watch.stop(); - LOG.info("CoordinatorTrigger ended, used time {} sm.", watch.elapsed(TimeUnit.MILLISECONDS)); - }); - } catch (Exception e) { - LOG.error("trigger schedule failed!", e); - } - } else { - LOG.info("CoordinatorTrigger found isPeriodicallyForceBuildEnable = false, skipped build"); - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java deleted file mode 100644 index 7d0ead5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.trigger; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import com.google.common.base.Stopwatch; -import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.TimeUnit; - -/** - * Poll policy change and notify listeners. - */ -public class DynamicPolicyLoader implements Runnable { - private static Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class); - - private IMetadataServiceClient client; - // initial cachedPolicies should be empty - private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); - private List<PolicyChangeListener> listeners = new ArrayList<>(); - - public DynamicPolicyLoader(IMetadataServiceClient client) { - this.client = client; - } - - public synchronized void addPolicyChangeListener(PolicyChangeListener listener) { - listeners.add(listener); - } - - /** - * When it is run at the first time, due to cachedPolicies being empty, all existing policies are expected - * to be addedPolicies. - */ - @SuppressWarnings("unchecked") - @Override - public void run() { - // we should catch every exception to avoid zombile thread - try { - final Stopwatch watch = Stopwatch.createStarted(); - LOG.info("Starting to load policies"); - List<PolicyDefinition> current = client.listPolicies(); - Map<String, PolicyDefinition> currPolicies = new HashMap<>(); - current.forEach(pe -> currPolicies.put(pe.getName(), pe)); - - Collection<String> addedPolicies = CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet()); - Collection<String> removedPolicies = CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet()); - Collection<String> potentiallyModifiedPolicies = CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet()); - - List<String> reallyModifiedPolicies = new ArrayList<>(); - for (String updatedPolicy : potentiallyModifiedPolicies) { - if (currPolicies.get(updatedPolicy) != null - && !currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) { - reallyModifiedPolicies.add(updatedPolicy); - } - } - - boolean policyChanged = false; - if (addedPolicies.size() != 0 - || removedPolicies.size() != 0 - || reallyModifiedPolicies.size() != 0) { - policyChanged = true; - } - - if (!policyChanged) { - LOG.info("No policy (totally {}) changed since last round", current.size()); - return; - } - - synchronized (this) { - for (PolicyChangeListener listener : listeners) { - listener.onPolicyChange(current, addedPolicies, removedPolicies, reallyModifiedPolicies); - } - } - - watch.stop(); - - LOG.info("Finished loading {} policies, added: {}, removed: {}, modified: {}, taken: {} ms", - current.size(), addedPolicies.size(), removedPolicies.size(), potentiallyModifiedPolicies.size(), watch.elapsed(TimeUnit.MILLISECONDS)); - // reset cached policies - cachedPolicies = currPolicies; - } catch (Throwable t) { - LOG.warn("Error loading policy, but continue to run", t); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java deleted file mode 100644 index d36765f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.trigger; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import java.util.Collection; -import java.util.List; - -public interface PolicyChangeListener { - void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, Collection<String> removedPolicies, Collection<String> modifiedPolicies); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java deleted file mode 100644 index 0229c20..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.coordinator.trigger; - -import com.google.common.base.Stopwatch; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -public class ScheduleStateCleaner implements Runnable { - - private static Logger LOG = LoggerFactory.getLogger(ScheduleStateCleaner.class); - - private IMetadataServiceClient client; - private int reservedCapacity; - - public ScheduleStateCleaner(IMetadataServiceClient client, int capacity) { - this.client = client; - this.reservedCapacity = capacity; - } - - @Override - public void run() { - // we should catch every exception to avoid zombile thread - try { - final Stopwatch watch = Stopwatch.createStarted(); - LOG.info("clear schedule states start."); - client.clearScheduleState(reservedCapacity); - watch.stop(); - LOG.info("clear schedule states completed. used time milliseconds: {}", watch.elapsed(TimeUnit.MILLISECONDS)); - // reset cached policies - } catch (Throwable t) { - LOG.error("fail to clear schedule states due to {}, but continue to run", t.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java deleted file mode 100644 index 7dadbf5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.eagle.alert.config.ConfigBusConsumer; -import org.apache.eagle.alert.config.ConfigBusProducer; -import org.apache.eagle.alert.config.ConfigChangeCallback; -import org.apache.eagle.alert.config.ConfigValue; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.Coordinator; -import org.apache.eagle.alert.coordinator.ScheduleOption; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.apache.eagle.alert.utils.ZookeeperEmbedded; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -/** - * @since May 5, 2016 - */ -public class CoordinatorTest { - - private static ZookeeperEmbedded zkEmbed; - - @BeforeClass - public static void setup() throws Exception { - zkEmbed = new ZookeeperEmbedded(2181); - int zkPort = zkEmbed.start(); - System.setProperty("coordinator.zkConfig.zkQuorum","localhost:"+ zkPort); - } - - @AfterClass - public static void teardown() { - zkEmbed.shutdown(); - } - - @SuppressWarnings( {"resource", "unused"}) - @Ignore - @Test - public void test() throws Exception { - before(); - Config config = ConfigFactory.load().getConfig("coordinator"); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - IMetadataServiceClient client = new MetadataServiceClientImpl(config); - - Coordinator coordinator = new Coordinator(config, zkConfig, client); - ScheduleOption option = new ScheduleOption(); - ScheduleState state = coordinator.schedule(option); - String v = state.getVersion(); - - AtomicBoolean validated = new AtomicBoolean(false); - ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() { - @Override - public void onNewConfig(ConfigValue value) { - String vId = value.getValue().toString(); - Assert.assertEquals(v, vId); - validated.set(true); - } - }); - - Thread.sleep(1000); - Assert.assertTrue(validated.get()); - } - - @SuppressWarnings( {"resource", "unused"}) - @Test - public void test_01() throws Exception { - before(); - Config config = ConfigFactory.load().getConfig("coordinator"); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - IMetadataServiceClient client = ScheduleContextBuilderTest.getSampleMetadataService(); - - Coordinator coordinator = new Coordinator(config, zkConfig, client); - ScheduleOption option = new ScheduleOption(); - ScheduleState state = coordinator.schedule(option); - String v = state.getVersion(); - - // TODO : assert version - - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean validated = new AtomicBoolean(false); - ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() { - @Override - public void onNewConfig(ConfigValue value) { - String vId = value.getValue().toString(); - Assert.assertEquals(v, vId); - validated.set(true); - latch.countDown(); - } - }); - - latch.await(3, TimeUnit.SECONDS); - Assert.assertTrue(validated.get()); - } - - @Before - public void before() { - System.setProperty("config.resource", "/test-application.conf"); - ConfigFactory.invalidateCaches(); - ConfigFactory.load().getConfig("coordinator"); - } - - @Test - public void test_Schedule() { - Coordinator.startSchedule(); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java deleted file mode 100644 index a86dd04..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.alert.coordinator; - -/** - * Since 4/28/16. - */ [email protected] -public class DynamicPolicyLoaderTest { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java deleted file mode 100644 index d71dd88..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import org.apache.eagle.alert.config.ConfigBusProducer; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.Coordinator; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.junit.Ignore; -import org.junit.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -/** - * @since May 9, 2016 - */ -public class MetadataServiceClientImplTest { - - @Ignore - @Test - public void addScheduleState() throws Exception { - ConfigFactory.invalidateCaches(); - System.setProperty("config.resource", "/test-application.conf"); - Config config = ConfigFactory.load("test-application.conf").getConfig("coordinator"); - MetadataServiceClientImpl client = new MetadataServiceClientImpl(config); - - ScheduleState ss = new ScheduleState(); - ss.setVersion("spec_version_1463764252582"); - - client.addScheduleState(ss); - - client.close(); - - ss.setVersion("spec_version_1464764252582"); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - ConfigBusProducer producer = new ConfigBusProducer(zkConfig); - Coordinator.postSchedule(client, ss, producer); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java deleted file mode 100644 index 9d2b9c7..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordinator.provider.NodataMetadataGenerator; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -public class NodataMetadataGeneratorTest { - - private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGeneratorTest.class); - - Config config = ConfigFactory.load().getConfig("coordinator"); - private NodataMetadataGenerator generator; - - @Before - public void setup() { - generator = new NodataMetadataGenerator(); - } - - @Test - public void testNormal() throws Exception { - StreamDefinition sd = createStreamDefinitionWithNodataAlert(); - Map<String, StreamDefinition> streamDefinitionsMap = new HashMap<String, StreamDefinition>(); - streamDefinitionsMap.put(sd.getStreamId(), sd); - - Map<String, Kafka2TupleMetadata> kafkaSources = new HashMap<String, Kafka2TupleMetadata>(); - Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>(); - Map<String, Publishment> publishments = new HashMap<String, Publishment>(); - - generator.execute(config, streamDefinitionsMap, kafkaSources, policies, publishments); - - Assert.assertEquals(2, kafkaSources.size()); - - kafkaSources.forEach((key, value) -> { - LOG.info("KafkaSources > {}: {}", key, ToStringBuilder.reflectionToString(value)); - }); - - Assert.assertEquals(2, policies.size()); - - policies.forEach((key, value) -> { - LOG.info("Policies > {}: {}", key, ToStringBuilder.reflectionToString(value)); - }); - - Assert.assertEquals(4, publishments.size()); - - publishments.forEach((key, value) -> { - LOG.info("Publishments > {}: {}", key, ToStringBuilder.reflectionToString(value)); - }); - } - - private StreamDefinition createStreamDefinitionWithNodataAlert() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - hostColumn.setNodataExpression("PT1M,dynamic,1,host"); - - StreamColumn valueColumn = new StreamColumn(); - valueColumn.setName("value"); - valueColumn.setType(StreamColumn.Type.DOUBLE); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); - sd.setDataSource("testDataSource"); - sd.setStreamId("testStreamId"); - return sd; - } - -}
