http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java deleted file mode 100644 index 0cf4a5d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import org.apache.eagle.alert.coordination.model.ScheduleState; - -/** - * @since Mar 24, 2016. - */ -public interface IPolicyScheduler { - - void init(IScheduleContext context, TopologyMgmtService mgmtService); - - /** - * Build the assignments for all. - */ - ScheduleState schedule(ScheduleOption option); - -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java deleted file mode 100644 index b21948b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -import java.util.Map; - -/** - * @since Mar 28, 2016. - */ -public interface IScheduleContext { - - Map<String, Topology> getTopologies(); - - Map<String, PolicyDefinition> getPolicies(); - - // data source - Map<String, Kafka2TupleMetadata> getDataSourceMetadata(); - - Map<String, StreamDefinition> getStreamSchemas(); - - Map<String, TopologyUsage> getTopologyUsages(); - - Map<String, PolicyAssignment> getPolicyAssignments(); - - Map<StreamGroup, MonitoredStream> getMonitoredStreams(); - - Map<String, Publishment> getPublishments(); - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java deleted file mode 100644 index 29e8cac..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; - -public class LockWebApplicationException extends WebApplicationException { - - private static final long serialVersionUID = 3441072187262776401L; - - public LockWebApplicationException() { - super(Response.Status.INTERNAL_SERVER_ERROR); - } - - public LockWebApplicationException(String message) { - super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(message).type("text/plain").build()); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java deleted file mode 100644 index c1bc726..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler; - -/** - * @since Mar 24, 2016. - */ -public class PolicySchedulerFactory { - - public static IPolicyScheduler createScheduler() { - return new GreedyPolicyScheduler(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java deleted file mode 100644 index 6fe27c5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -/** - * A runtime option for one schedule processing. - * <p>Could used for configuration override.</p> - * - * @since Apr 19, 2016 - */ -public class ScheduleOption { - private int policiesPerBolt; - private int boltParallelism; - private int policyDefaultParallelism; - private double boltLoadUpbound; - private double topoLoadUpbound; - - public int getPoliciesPerBolt() { - return policiesPerBolt; - } - - public void setPoliciesPerBolt(int policiesPerBolt) { - this.policiesPerBolt = policiesPerBolt; - } - - public int getBoltParallelism() { - return boltParallelism; - } - - public void setBoltParallelism(int boltParallelism) { - this.boltParallelism = boltParallelism; - } - - public int getPolicyDefaultParallelism() { - return policyDefaultParallelism; - } - - public void setPolicyDefaultParallelism(int policyDefaultParallelism) { - this.policyDefaultParallelism = policyDefaultParallelism; - } - - public double getBoltLoadUpbound() { - return boltLoadUpbound; - } - - public void setBoltLoadUpbound(double boltLoadUpbound) { - this.boltLoadUpbound = boltLoadUpbound; - } - - public double getTopoLoadUpbound() { - return topoLoadUpbound; - } - - public void setTopoLoadUpbound(double topoLoadUpbound) { - this.topoLoadUpbound = topoLoadUpbound; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java deleted file mode 100644 index 4ede29d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR; -import static org.apache.eagle.alert.coordinator.CoordinatorConstants.NUM_OF_ALERT_BOLTS_PER_TOPOLOGY; - -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import java.util.Collections; -import java.util.List; - -/** - * @since Mar 29, 2016. - */ -public class TopologyMgmtService { - - public static class TopologyMeta { - public String topologyId; - public Topology topology; - public TopologyUsage usage; - - public String clusterId; - public String nimbusHost; - public String nimbusPort; - - } - - public static class StormClusterMeta { - public String clusterId; - public String nimbusHost; - public String nimbusPort; - public String stormVersion; - } - - @SuppressWarnings("unused") - private int boltParallelism = 0; - private int numberOfBoltsPerTopology = 0; - - public TopologyMgmtService() { - Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR); - //boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM); - numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY); - } - - public int getNumberOfAlertBoltsInTopology() { - return numberOfBoltsPerTopology; - } - - /** - * TODO: call topology mgmt API to create a topology. - */ - public TopologyMeta creatTopology() { - // TODO - throw new UnsupportedOperationException("not supported yet!"); - } - - public List<TopologyMeta> listTopologies() { - // TODO - return Collections.emptyList(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java deleted file mode 100644 index 9dc177b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import java.util.*; - -/** - * Created on 10/1/16. - */ -public class ValidateState { - - private boolean isOk = false; - - private List<String> unusedDataSources = new ArrayList<>(); - private List<String> unusedStreams = new ArrayList<>(); - private List<String> unPublishedPolicies = new ArrayList<>(); - - /* - * Includes validation of extension class existence - * Policy expression validation - * Inter-Reference validation - */ - private Map<String, List<String>> dataSourcesValidation = new HashMap<>(); - private Map<String, List<String>> streamsValidation = new HashMap<>(); - private Map<String, List<String>> policiesValidation = new HashMap<>(); - private Map<String, List<String>> publishmentValidation = new HashMap<>(); - private Map<String, List<String>> topoMetaValidation = new HashMap<>(); - - public void appendUnusedDatasource(String ds) { - unusedDataSources.add(ds); - } - - public void appendUnusedStreams(String s) { - unusedStreams.add(s); - } - - public void appendUnPublishedPolicies(String s) { - unPublishedPolicies.add(s); - } - - public void appendDataSourceValidation(String name, String msg) { - if (!dataSourcesValidation.containsKey(name)) { - dataSourcesValidation.putIfAbsent(name, new LinkedList<>()); - } - dataSourcesValidation.get(name).add(msg); - } - - public void appendStreamValidation(String name, String msg) { - if (!streamsValidation.containsKey(name)) { - streamsValidation.putIfAbsent(name, new LinkedList<>()); - } - streamsValidation.get(name).add(msg); - } - - public void appendPolicyValidation(String name, String msg) { - if (!policiesValidation.containsKey(name)) { - policiesValidation.putIfAbsent(name, new LinkedList<>()); - } - policiesValidation.get(name).add(msg); - } - - public void appendPublishemtnValidation(String name, String msg) { - if (!publishmentValidation.containsKey(name)) { - publishmentValidation.putIfAbsent(name, new LinkedList<>()); - } - publishmentValidation.get(name).add(msg); - } - - public void appendTopoMetaValidation(String name, String msg) { - if (!topoMetaValidation.containsKey(name)) { - topoMetaValidation.putIfAbsent(name, new LinkedList<>()); - } - topoMetaValidation.get(name).add(msg); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java deleted file mode 100644 index e50b64c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl; - -import static org.apache.eagle.alert.coordinator.CoordinatorConstants.*; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.*; -import org.apache.eagle.alert.coordinator.IPolicyScheduler; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.ScheduleOption; -import org.apache.eagle.alert.coordinator.TopologyMgmtService; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.utils.JsonUtils; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * A simple greedy assigner. <br/> - * - * <p>A greedy assigner simply loop the policies, find the most suitable topology - * to locate the policy first, then assign the topics to corresponding - * spouts/group-by bolts.</p> - * - * <p>For each given policy, the greedy steps are</p> - * - * <ul> - * <li>1. Find the same topology that already serve the policy without exceed the load</li> - * <li>2. Find the topology that already take the source traffic without exceed the load</li> - * <li>3. Find the topology that available to place source topic without exceed the load</li> - * <li>4. Create a new topology and locate the policy</li> - * <li>Route table generated after all policies assigned</li> - * </ul> - * @since Mar 24, 2016 - */ -public class GreedyPolicyScheduler implements IPolicyScheduler { - - private static final Logger LOG = LoggerFactory.getLogger(GreedyPolicyScheduler.class); - - private int policiesPerBolt; - private int policyDefaultParallelism; - private int initialQueueSize; - private double boltLoadUpbound; - - // copied context for scheduling - private IScheduleContext context; - - private TopologyMgmtService mgmtService; - - private ScheduleState state; - - public GreedyPolicyScheduler() { - Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR); - policiesPerBolt = config.getInt(POLICIES_PER_BOLT); - policyDefaultParallelism = config.getInt(POLICY_DEFAULT_PARALLELISM); - initialQueueSize = policyDefaultParallelism; - boltLoadUpbound = config.getDouble(CONFIG_ITEM_BOLT_LOAD_UPBOUND); - } - - public synchronized ScheduleState schedule(ScheduleOption option) { - // FIXME: never re-assign right now: sticky mode - // TODO: how to identify the over-heat nodes? not yet done #Scale of policies - // Answer: Use configured policiesPerBolt and configured bolt load up-bound - // FIXME: Here could be strategy to define the topology priorities - List<WorkItem> workSets = findWorkingSets(); - /** - * <pre> - * <ul> - * <li>how to support take multiple "dumped" topology that consuming the same input as one available sets?</li> - * Answer: spout spec generated after policy assigned - * <li>How to define the input traffic partition?</li> - * Answer: input traffic might not be supported right now. - * <li>How to support traffic partition between topology?</li> - * Answer: two possible place: a global route table will be generated, those target not in current topology tuples will be dropped. This make the partition for tuple to alert - * <li>How to support add topology on demand by evaluate the available topology bandwidth(need topology level load)?</li> - * Answer: Use configured topology load up-bound, when topology load is available, will adopt - * </ul> - * </pre> - */ - List<ScheduleResult> results = new ArrayList<ScheduleResult>(); - Map<String, PolicyAssignment> newAssignments = new HashMap<String, PolicyAssignment>(); - for (WorkItem item : workSets) { - ScheduleResult r = schedulePolicy(item, newAssignments); - results.add(r); - } - - state = generateMonitorMetadata(workSets, newAssignments); - if (LOG.isDebugEnabled()) { - LOG.debug("calculated schedule state: {}", JsonUtils.writeValueAsString(state)); - } - return state; - } - - private List<WorkItem> findWorkingSets() { - // find the unassigned definition - List<WorkItem> workSets = new LinkedList<WorkItem>(); - for (PolicyDefinition def : context.getPolicies().values()) { - int expectParal = def.getParallelismHint(); - if (expectParal == 0) { - expectParal = policyDefaultParallelism; - } - // how to handle expand of an policy in a smooth transition manner - // TODO policy fix - PolicyAssignment assignment = context.getPolicyAssignments().get(def.getName()); - if (assignment != null) { - LOG.info("policy {} already allocated", def.getName()); - continue; - } - - WorkItem item = new WorkItem(def, expectParal); - workSets.add(item); - } - LOG.info("work set calculation: {}", workSets); - return workSets; - } - - private ScheduleState generateMonitorMetadata(List<WorkItem> expandworkSets, - Map<String, PolicyAssignment> newAssignments) { - MonitorMetadataGenerator generator = new MonitorMetadataGenerator(context); - return generator.generate(expandworkSets); - } - - private void placePolicy(PolicyDefinition def, AlertBoltUsage alertBoltUsage, Topology targetTopology, - TopologyUsage usage) { - String policyName = def.getName(); - - // topology usage update - alertBoltUsage.addPolicies(def); - - // update alert policy - usage.getPolicies().add(policyName); - - // update source topics - updateDataSource(usage, def); - - // update group-by - updateGrouping(usage, def); - } - - private void updateGrouping(TopologyUsage usage, PolicyDefinition def) { - // groupByMeta is removed since groupspec generate doesnt need it now. - // List<StreamPartition> policyPartitionSpec = def.getPartitionSpec(); - // Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta(); - // for (StreamPartition par : policyPartitionSpec) { - // List<StreamPartition> partitions = groupByMeta.get(par.getStreamId()); - // if (partitions == null) { - // partitions = new ArrayList<StreamPartition>(); - // // de-dup of the partition on the list? - // groupByMeta.put(par.getStreamId(), partitions); - // } - // if (!partitions.contains(par)) { - // partitions.add(par); - // } - // } - } - - private void updateDataSource(TopologyUsage usage, PolicyDefinition def) { - List<String> datasources = findDatasource(def); - usage.getDataSources().addAll(datasources); - } - - private List<String> findDatasource(PolicyDefinition def) { - List<String> result = new ArrayList<String>(); - - List<String> inputStreams = def.getInputStreams(); - Map<String, StreamDefinition> schemaMaps = context.getStreamSchemas(); - for (String is : inputStreams) { - StreamDefinition ss = schemaMaps.get(is); - result.add(ss.getDataSource()); - } - return result; - } - - /** - * For each given policy, the greedy steps are - * <ul> - * <li>1. Find the same topology that already serve the policy</li> - * <li>2. Find the topology that already take the source traffic</li> - * <li>3. Find the topology that available to place source topic</li> - * <li>4. Create a new topology and locate the policy</li> - * <li>Route table generated after all policies assigned</li> - * </ul> - * <br/> - * - * @param newAssignments - */ - private ScheduleResult schedulePolicy(WorkItem item, Map<String, PolicyAssignment> newAssignments) { - LOG.info(" schedule for {}", item); - - String policyName = item.def.getName(); - StreamGroup policyStreamPartition = new StreamGroup(); - if (item.def.getPartitionSpec().isEmpty()) { - LOG.error(" policy {} partition spec is empty! ", policyName); - ScheduleResult result = new ScheduleResult(); - result.policyName = policyName; - result.code = 400; - result.message = "policy doesn't have partition spec"; - return result; - } - policyStreamPartition.addStreamPartitions(item.def.getPartitionSpec(), item.def.isDedicated()); - - MonitoredStream targetdStream = context.getMonitoredStreams().get(policyStreamPartition); - if (targetdStream == null) { - targetdStream = new MonitoredStream(policyStreamPartition); - context.getMonitoredStreams().put(policyStreamPartition, targetdStream); - } - - ScheduleResult result = new ScheduleResult(); - result.policyName = policyName; - - StreamWorkSlotQueue queue = findWorkSlotQueue(targetdStream, item.def); - if (queue == null) { - result.code = 400; - result.message = String.format("unable to allocate work queue resource for policy %s !", policyName); - } else { - placePolicyToQueue(item.def, queue, newAssignments); - result.code = 200; - result.message = "OK"; - } - - LOG.info(" schedule result : {}", result); - return result; - } - - private void placePolicyToQueue(PolicyDefinition def, StreamWorkSlotQueue queue, - Map<String, PolicyAssignment> newAssignments) { - for (WorkSlot slot : queue.getWorkingSlots()) { - Topology targetTopology = context.getTopologies().get(slot.getTopologyName()); - TopologyUsage usage = context.getTopologyUsages().get(slot.getTopologyName()); - AlertBoltUsage alertBoltUsage = usage.getAlertBoltUsage(slot.getBoltId()); - placePolicy(def, alertBoltUsage, targetTopology, usage); - } - // queue.placePolicy(def); - PolicyAssignment assignment = new PolicyAssignment(def.getName(), queue.getQueueId()); - context.getPolicyAssignments().put(def.getName(), assignment); - newAssignments.put(def.getName(), assignment); - } - - private StreamWorkSlotQueue findWorkSlotQueue(MonitoredStream targetdStream, PolicyDefinition def) { - StreamWorkSlotQueue targetQueue = null; - for (StreamWorkSlotQueue queue : targetdStream.getQueues()) { - if (isQueueAvailable(queue, def)) { - targetQueue = queue; - break; - } - } - - if (targetQueue == null) { - WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService); - // TODO : get the properties from policy definiton - targetQueue = builder.createQueue(targetdStream, def.isDedicated(), getQueueSize(def.getParallelismHint()), - new HashMap<String, Object>()); - } - return targetQueue; - } - - /** - * Some strategy to generate correct size in Startegy of queue builder. - */ - private int getQueueSize(int hint) { - if (hint == 0) { - // some policies require single bolt to execute - return 1; - } - return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize); - } - - private boolean isQueueAvailable(StreamWorkSlotQueue queue, PolicyDefinition def) { - if (queue.getQueueSize() < def.getParallelismHint()) { - return false; - } - - for (WorkSlot slot : queue.getWorkingSlots()) { - TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName()); - AlertBoltUsage usage = u.getAlertBoltUsage(slot.getBoltId()); - if (!isBoltAvailable(usage, def)) { - return false; - } - } - return true; - } - - private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) { - // overload or over policy # or already contains - if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound - || boltUsage.getPolicies().size() >= policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) { - return false; - } - return true; - } - - public void init(IScheduleContext context, TopologyMgmtService mgmtService) { - this.context = new InMemScheduleConext(context); - this.mgmtService = mgmtService; - } - - public IScheduleContext getContext() { - return context; - } - - public ScheduleState getState() { - return state; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java deleted file mode 100644 index 5d7eeb1..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl; - -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.ValidateState; -import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext; -import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.SiddhiManager; - -import java.util.*; -import java.util.stream.Collectors; - -/** - * Created on 10/1/16. - */ -public class MetadataValdiator { - private static final Logger LOG = LoggerFactory.getLogger(MetadataValdiator.class); - - private static final Map<StreamColumn.Type, String> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>(); - - static { - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, "STRING"); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, "INT"); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, "LONG"); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, "FLOAT"); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, "DOUBLE"); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, "BOOL"); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, "OBJECT"); - } - - private IScheduleContext context; - private final ValidateState state; - - public MetadataValdiator(IMetadataServiceClient client) { - List<Topology> topologies = client.listTopologies(); - List<Kafka2TupleMetadata> datasources = client.listDataSources(); - List<StreamDefinition> streams = client.listStreams(); - // filter out disabled policies - List<PolicyDefinition> enabledPolicies = client.listPolicies(); - List<Publishment> publishments = client.listPublishment(); - - context = new InMemScheduleConext(ScheduleContextBuilder.listToMap(topologies), new HashMap<>(), - ScheduleContextBuilder.listToMap(datasources), - ScheduleContextBuilder.listToMap(enabledPolicies), - ScheduleContextBuilder.listToMap(publishments), - ScheduleContextBuilder.listToMap(streams), new HashMap<>(), new HashMap<>()); - this.state = new ValidateState(); - } - - public MetadataValdiator(IScheduleContext context) { - this.context = context; - this.state = new ValidateState(); - } - - - public ValidateState validate() { - - validateTopology(); - - validateDataSources(); - - validateStreams(); - - validatePolicies(); - - validatePublishments(); - - return state; - } - - private void validatePolicies() { - Collection<Publishment> pubs = context.getPublishments().values(); - for (PolicyDefinition pd : context.getPolicies().values()) { - if (!pubs.stream().anyMatch(p -> p.getPolicyIds().contains(pd.getName()))) { - state.appendUnPublishedPolicies(pd.getName()); - } - - boolean isStreamMiss = false; - StringBuilder builder = new StringBuilder(); - for (String inputStream : pd.getInputStreams()) { - if (context.getStreamSchemas().get(inputStream) == null) { - state.appendPublishemtnValidation(pd.getName(), String.format("policy %s contains unknown stream %s!", pd.getName(), inputStream)); - isStreamMiss = true; - break; - } - builder.append(buildStreamDefinition(context.getStreamSchemas().get(inputStream))); - builder.append("\n"); - } - - if (isStreamMiss) { - continue; - } - builder.append(pd.getDefinition().getValue()); - - // now evaluate - try { - SiddhiManager sm = new SiddhiManager(); - sm.createExecutionPlanRuntime(builder.toString()); - } catch (Exception e) { - LOG.error(String.format("siddhi creation failed! %s ", builder.toString()), e); - state.appendPolicyValidation(pd.getName(), e.getMessage()); - } - } - } - - private String buildStreamDefinition(StreamDefinition streamDefinition) { - List<String> columns = new ArrayList<>(); - if (streamDefinition.getColumns() != null) { - for (StreamColumn column : streamDefinition.getColumns()) { - columns.add(String.format("%s %s", column.getName(), _EAGLE_SIDDHI_TYPE_MAPPING.get(column.getType().toString().toLowerCase()))); - } - } else { - LOG.warn("No columns found for stream {}" + streamDefinition.getStreamId()); - } - return String.format("define stream %s( %s );", streamDefinition.getStreamId(), StringUtils.join(columns, ",")); - } - - - private void validatePublishments() { - Collection<PolicyDefinition> definitions = context.getPolicies().values(); - - for (Publishment p : context.getPublishments().values()) { - //TODO: check type; check serializer types; check dedup fields existence; check extend deduplicator... - Set<String> unknown = p.getPolicyIds().stream().filter(pid -> definitions.stream().anyMatch(pd -> pd.getName().equals(pid))).collect(Collectors.toSet()); - if (unknown.size() > 0) { - state.appendPublishemtnValidation(p.getName(), String.format("publishment %s reference unknown/uneabled policy %s!", p.getName(), unknown)); - } - } - } - - private void validateStreams() { - Collection<Kafka2TupleMetadata> datasources = context.getDataSourceMetadata().values(); - Collection<PolicyDefinition> definitions = context.getPolicies().values(); - for (StreamDefinition sd : context.getStreamSchemas().values()) { - if (!datasources.stream().anyMatch(d -> d.getName().equals(sd.getDataSource()))) { - state.appendStreamValidation(sd.getStreamId(), String.format("stream %s reference unknown data source %s !", sd.getStreamId(), sd.getDataSource())); - } - if (!definitions.stream().anyMatch(p -> p.getInputStreams().contains(sd.getStreamId()))) { - state.appendUnusedStreams(sd.getStreamId()); - } - // more on columns - if (sd.getColumns() == null || sd.getColumns().size() == 0) { - state.appendStreamValidation(sd.getStreamId(), String.format("stream %s have empty columns!", sd.getStreamId())); - } - } - } - - private void validateDataSources() { - Collection<StreamDefinition> sds = context.getStreamSchemas().values(); - for (Kafka2TupleMetadata ds : context.getDataSourceMetadata().values()) { - // simply do a O(^2) loop - if (!sds.stream().anyMatch(t -> t.getDataSource().equals(ds.getName()))) { - state.appendUnusedDatasource(ds.getName()); - } - - if (!"KAFKA".equalsIgnoreCase(ds.getType())) { - state.appendDataSourceValidation(ds.getName(), String.format(" unsupported data source type %s !", ds.getType())); - } - - //scheme - // String schemeCls = ds.getSchemeCls(); - // try { - // Object scheme = Class.forName(schemeCls).getConstructor(String.class, Map.class).newInstance(ds.getTopic(), new HashMap<>());// coul only mock empty map - // if (!(scheme instanceof MultiScheme || scheme instanceof Scheme)) { - // throw new IllegalArgumentException(" scheme class not subclass of Scheme or MultiScheme !"); - // } - // } catch (Exception e) { - // state.appendDataSourceValidation(ds.getName(), String.format("schemeCls %s expected to be qualified sub class name of %s or %s with given constructor signature!" - // +"Message: %s !", - // schemeCls, Scheme.class.getCanonicalName(), MultiScheme.class.getCanonicalName(), e.getMessage())); - // } - - // codec - if (ds.getCodec() == null) { - state.appendDataSourceValidation(ds.getName(), String.format("codec of datasource must *not* be null!")); - continue; - } - // String selectCls = ds.getCodec().getStreamNameSelectorCls(); - // try { - // StreamNameSelector cachedSelector = (StreamNameSelector) Class.forName(selectCls).getConstructor(Properties.class) - // .newInstance(ds.getCodec().getStreamNameSelectorProp()); - // } catch (Exception e) { - // state.appendDataSourceValidation(ds.getName(), String.format("streamNameSelectorCls %s expected to be subclass of %s and with given constructor signature! Message: %s !", - // selectCls, StreamNameSelector.class.getCanonicalName(), e.getMessage())); - // } - - } - } - - private void validateTopology() { - for (Topology t : context.getTopologies().values()) { - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java deleted file mode 100644 index fb20e66..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl; - -import org.apache.eagle.alert.coordination.model.*; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Given current policy placement, figure out monitor metadata - * <p>TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create.... - * FIXME: too many duplicated code logic : check null; add list to map; add to list..</p> - * - * @since Apr 26, 2016 - */ -public class MonitorMetadataGenerator { - - private static final Logger LOG = LoggerFactory.getLogger(MonitorMetadataGenerator.class); - - private IScheduleContext context; - - public MonitorMetadataGenerator(IScheduleContext context) { - this.context = context; - } - - public ScheduleState generate(List<WorkItem> expandworkSets) { - // topologyId -> SpoutSpec - Map<String, SpoutSpec> topoSpoutSpecsMap = generateSpoutMonitorMetadata(); - - // grp-by meta spec(sort & grp) - Map<String, RouterSpec> groupSpecsMap = generateGroupbyMonitorMetadata(); - - // alert bolt spec - Map<String, AlertBoltSpec> alertSpecsMap = generateAlertMonitorMetadata(); - - Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata(); - - String uniqueVersion = generateVersion(); - ScheduleState status = new ScheduleState(uniqueVersion, - topoSpoutSpecsMap, - groupSpecsMap, - alertSpecsMap, - publishSpecsMap, - context.getPolicyAssignments().values(), - context.getMonitoredStreams().values(), - context.getPolicies().values(), - context.getStreamSchemas().values()); - return status; - } - - private Map<String, PublishSpec> generatePublishMetadata() { - Map<String, PublishSpec> pubSpecs = new HashMap<String, PublishSpec>(); - // prebuild policy to publishment map - Map<String, List<Publishment>> policyToPub = new HashMap<String, List<Publishment>>(); - for (Publishment pub : context.getPublishments().values()) { - for (String policyId : pub.getPolicyIds()) { - List<Publishment> policyPubs = policyToPub.get(policyId); - if (policyPubs == null) { - policyPubs = new ArrayList<>(); - policyToPub.put(policyId, policyPubs); - } - policyPubs.add(pub); - } - } - - // build per topology - for (TopologyUsage u : context.getTopologyUsages().values()) { - PublishSpec pubSpec = pubSpecs.get(u.getTopoName()); - if (pubSpec == null) { - pubSpec = new PublishSpec(u.getTopoName(), context.getTopologies().get(u.getTopoName()).getPubBoltId()); - pubSpecs.put(u.getTopoName(), pubSpec); - } - - for (String p : u.getPolicies()) { - PolicyDefinition definition = context.getPolicies().get(p); - if (definition == null) { - continue; - } - if (policyToPub.containsKey(p)) { - for (Publishment pub : policyToPub.get(p)) { - pubSpec.addPublishment(pub); - } - } - } - } - return pubSpecs; - } - - /** - * FIXME: add auto-increment version number?. - */ - private String generateVersion() { - return "spec_version_" + System.currentTimeMillis(); - } - - private Map<String, AlertBoltSpec> generateAlertMonitorMetadata() { - Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>(); - for (TopologyUsage u : context.getTopologyUsages().values()) { - AlertBoltSpec alertSpec = alertSpecs.get(u.getTopoName()); - if (alertSpec == null) { - alertSpec = new AlertBoltSpec(u.getTopoName()); - alertSpecs.put(u.getTopoName(), alertSpec); - } - for (AlertBoltUsage boltUsage : u.getAlertUsages().values()) { - for (String policyName : boltUsage.getPolicies()) { - PolicyDefinition definition = context.getPolicies().get(policyName); - alertSpec.addBoltPolicy(boltUsage.getBoltId(), definition.getName()); - - for (Publishment publish : context.getPublishments().values()) { - if (!publish.getPolicyIds().contains(definition.getName())) { - continue; - } - - List<String> streamIds = new ArrayList<>(); - // add the publish to the bolt - if (publish.getStreamIds() == null || publish.getStreamIds().size() <= 0) { - streamIds.add(Publishment.STREAM_NAME_DEFAULT); - } else { - streamIds.addAll(publish.getStreamIds()); - } - for (String streamId : streamIds) { - alertSpec.addPublishPartition(streamId, policyName, publish.getName(), publish.getPartitionColumns()); - } - } - } - } - } - return alertSpecs; - } - - private Map<String, RouterSpec> generateGroupbyMonitorMetadata() { - Map<String, RouterSpec> groupSpecsMap = new HashMap<String, RouterSpec>(); - for (TopologyUsage u : context.getTopologyUsages().values()) { - RouterSpec spec = groupSpecsMap.get(u.getTopoName()); - if (spec == null) { - spec = new RouterSpec(u.getTopoName()); - groupSpecsMap.put(u.getTopoName(), spec); - } - - for (MonitoredStream ms : u.getMonitoredStream()) { - // mutiple stream on the same policy group : for correlation group case: - for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) { - StreamRouterSpec routeSpec = new StreamRouterSpec(); - routeSpec.setPartition(partiton); - routeSpec.setStreamId(partiton.getStreamId()); - - for (StreamWorkSlotQueue sq : ms.getQueues()) { - PolicyWorkerQueue queue = new PolicyWorkerQueue(); - queue.setWorkers(sq.getWorkingSlots()); - queue.setPartition(partiton); - routeSpec.addQueue(queue); - } - - spec.addRouterSpec(routeSpec); - } - } - } - - return groupSpecsMap; - } - - private Map<String, SpoutSpec> generateSpoutMonitorMetadata() { - Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap(); - - Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>(); - // streamName -> StreamDefinition - Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas(); - Map<String, Kafka2TupleMetadata> datasourcesMap = context.getDataSourceMetadata(); - for (TopologyUsage usage : context.getTopologyUsages().values()) { - Topology topo = context.getTopologies().get(usage.getTopoName()); - - // based on data source schemas - // generate topic -> Kafka2TupleMetadata - // generate topic -> Tuple2StreamMetadata (actually the schema selector) - Map<String, Kafka2TupleMetadata> dss = new HashMap<String, Kafka2TupleMetadata>(); - Map<String, Tuple2StreamMetadata> tss = new HashMap<String, Tuple2StreamMetadata>(); - for (String dataSourceId : usage.getDataSources()) { - Kafka2TupleMetadata ds = datasourcesMap.get(dataSourceId); - dss.put(ds.getTopic(), ds); - tss.put(ds.getTopic(), ds.getCodec()); - } - - // generate topicId -> StreamRepartitionMetadata - Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>(); - for (String policyName : usage.getPolicies()) { - PolicyDefinition def = context.getPolicies().get(policyName); - - PolicyAssignment assignment = context.getPolicyAssignments().get(policyName); - if (assignment == null) { - LOG.error(" can not find assignment for policy {} ! ", policyName); - continue; - } - - for (StreamPartition policyStreamPartition : def.getPartitionSpec()) { - String stream = policyStreamPartition.getStreamId(); - StreamDefinition schema = streamSchemaMap.get(stream); - String topic = datasourcesMap.get(schema.getDataSource()).getTopic(); - - // add stream name to tuple metadata - if (tss.containsKey(topic)) { - Tuple2StreamMetadata tupleMetadata = tss.get(topic); - tupleMetadata.getActiveStreamNames().add(stream); - } - - // grouping strategy - StreamRepartitionStrategy gs = new StreamRepartitionStrategy(); - gs.partition = policyStreamPartition; - gs.numTotalParticipatingRouterBolts = queueMap.get(assignment.getQueueId()).getNumberOfGroupBolts(); - gs.startSequence = queueMap.get(assignment.getQueueId()).getTopologyGroupStartIndex(topo.getName()); - gs.totalTargetBoltIds = new ArrayList<String>(topo.getGroupNodeIds()); - - // add to map - addGroupingStrategy(streamsMap, stream, schema, topic, schema.getDataSource(), gs); - } - } - - SpoutSpec spoutSpec = new SpoutSpec(topo.getName(), streamsMap, tss, dss); - topoSpoutSpecsMap.put(topo.getName(), spoutSpec); - } - return topoSpoutSpecsMap; - } - - /** - * Work queue not a root level object, thus we need to build a map from - * MonitoredStream for later quick lookup. - */ - private Map<String, StreamWorkSlotQueue> buildQueueMap() { - Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>(); - for (MonitoredStream ms : context.getMonitoredStreams().values()) { - for (StreamWorkSlotQueue queue : ms.getQueues()) { - queueMap.put(queue.getQueueId(), queue); - } - } - return queueMap; - } - - private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream, - StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) { - List<StreamRepartitionMetadata> dsStreamMeta; - if (streamsMap.containsKey(topicName)) { - dsStreamMeta = streamsMap.get(topicName); - } else { - dsStreamMeta = new ArrayList<StreamRepartitionMetadata>(); - streamsMap.put(topicName, dsStreamMeta); - } - StreamRepartitionMetadata targetSm = null; - for (StreamRepartitionMetadata sm : dsStreamMeta) { - if (stream.equalsIgnoreCase(sm.getStreamId())) { - targetSm = sm; - break; - } - } - if (targetSm == null) { - targetSm = new StreamRepartitionMetadata(topicName, schema.getStreamId()); - dsStreamMeta.add(targetSm); - } - if (!targetSm.groupingStrategies.contains(gs)) { - targetSm.addGroupStrategy(gs); - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java deleted file mode 100644 index a46537d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl; - -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import java.util.List; - -/** - * Schedule result for one policy. - * - * @since Apr 26, 2016 - */ -public class ScheduleResult { - int code; - String message; - String policyName; - StreamPartition partition; - int index; - List<PolicyAssignment> topoliciesScheduled; - - public String toString() { - return String.format("policy: %s, result code: %d ", policyName, code, message); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java deleted file mode 100644 index baa489d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; - -public class WorkItem { - public final PolicyDefinition def; - public final int requestParallelism; - - public WorkItem(PolicyDefinition def, int workNum) { - this.def = def; - this.requestParallelism = workNum; - } - - public String toString() { - return "policy name: " + def.getName() + "(" + requestParallelism + ")"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java deleted file mode 100644 index a717b1c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl; - -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.TopologyMgmtService; -import org.apache.eagle.alert.coordinator.impl.strategies.IWorkSlotStrategy; -import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @since Apr 27, 2016. - */ -public class WorkQueueBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(WorkQueueBuilder.class); - - private final IScheduleContext context; - private final TopologyMgmtService mgmtService; - - public WorkQueueBuilder(IScheduleContext context, TopologyMgmtService mgmtService) { - this.context = context; - this.mgmtService = mgmtService; - } - - public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size, - Map<String, Object> properties) { - // FIXME: make extensible and configurable - IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService); - List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties); - if (slots.size() < size) { - LOG.error("allocate stream work queue failed, required size"); - return null; - } - StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties, - slots); - calculateGroupIndexAndCount(queue); - assignQueueSlots(stream, queue);// build reverse reference - stream.addQueues(queue); - - return queue; - } - - private void assignQueueSlots(MonitoredStream stream, StreamWorkSlotQueue queue) { - for (WorkSlot slot : queue.getWorkingSlots()) { - TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName()); - AlertBoltUsage boltUsage = u.getAlertBoltUsage(slot.getBoltId()); - boltUsage.addQueue(stream.getStreamGroup(), queue); - u.addMonitoredStream(stream); - } - } - - private void calculateGroupIndexAndCount(StreamWorkSlotQueue queue) { - Map<String, Integer> result = new HashMap<String, Integer>(); - int total = 0; - for (WorkSlot slot : queue.getWorkingSlots()) { - if (result.containsKey(slot.getTopologyName())) { - continue; - } - result.put(slot.getTopologyName(), total); - total += context.getTopologies().get(slot.getTopologyName()).getNumOfGroupBolt(); - } - - queue.setNumberOfGroupBolts(total); - queue.setTopoGroupStartIndex(result); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java deleted file mode 100644 index 8528606..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl.strategies; - -import org.apache.eagle.alert.coordination.model.WorkSlot; -import java.util.List; -import java.util.Map; - -/** - * @since Apr 27, 2016. - */ -public interface IWorkSlotStrategy { - - List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties); - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java deleted file mode 100644 index e401e98..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.impl.strategies; - -import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND; - -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.CoordinatorConstants; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.TopologyMgmtService; -import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * A simple strategy that only find the bolts in the same topology as the - * required work slots. - * Invariant:<br/> - * One slot queue only on the one topology.<br/> - * One topology doesn't contains two same partition slot queues. - * @since Apr 27, 2016 - */ -public class SameTopologySlotStrategy implements IWorkSlotStrategy { - - private static final Logger LOG = LoggerFactory.getLogger(SameTopologySlotStrategy.class); - - private final IScheduleContext context; - private final StreamGroup partitionGroup; - private final TopologyMgmtService mgmtService; - - private final int numOfPoliciesBoundPerBolt; - private final double topoLoadUpbound; - private final boolean reuseBoltInStreams; - private final int streamsPerBolt; - - public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup, - TopologyMgmtService mgmtService) { - this.context = context; - this.partitionGroup = streamPartitionGroup; - this.mgmtService = mgmtService; - - Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR); - numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT); - topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND); - if (config.hasPath(CoordinatorConstants.REUSE_BOLT_IN_STREAMS)) { - reuseBoltInStreams = config.getBoolean(CoordinatorConstants.REUSE_BOLT_IN_STREAMS); - } else { - reuseBoltInStreams = false; - } - if (config.hasPath(CoordinatorConstants.STREAMS_PER_BOLT)) { - streamsPerBolt = config.getInt(CoordinatorConstants.STREAMS_PER_BOLT); - } else { - streamsPerBolt = 1; - } - } - - /** - * @param isDedicated - not used yet!. - */ - @Override - public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) { - Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size) - .iterator(); - // priority strategy first??? - List<WorkSlot> slots = new ArrayList<WorkSlot>(); - while (it.hasNext()) { - Topology t = it.next(); - if (getQueueOnTopology(size, slots, t)) { - break; - } - } - - if (slots.size() == 0) { - int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology(); - if (size > supportedSize) { - LOG.error("can not find available slots for queue, required size {}, supported size {} !", size, supportedSize); - return Collections.emptyList(); - } - TopologyMeta topoMeta = mgmtService.creatTopology(); - if (topoMeta == null) { - LOG.error("can not create topology for given queue requirement, required size {}, requried partition group {} !", size, partitionGroup); - return Collections.emptyList(); - } - - context.getTopologies().put(topoMeta.topologyId, topoMeta.topology); - context.getTopologyUsages().put(topoMeta.topologyId, topoMeta.usage); - boolean placed = getQueueOnTopology(size, slots, topoMeta.topology); - if (!placed) { - LOG.error("can not find available slots from new created topology, required size {}. This indicates an error !", size); - } - } - return slots; - } - - private boolean getQueueOnTopology(int size, List<WorkSlot> slots, Topology t) { - TopologyUsage u = context.getTopologyUsages().get(t.getName()); - if (!isTopologyAvailable(u)) { - return false; - } - - List<String> bolts = new ArrayList<String>(); - for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) { - if (isBoltAvailable(alertUsage)) { - bolts.add(alertUsage.getBoltId()); - } - - if (bolts.size() == size) { - break; - } - } - - if (bolts.size() == size) { - for (String boltId : bolts) { - WorkSlot slot = new WorkSlot(t.getName(), boltId); - slots.add(slot); - } - return true; - } - return false; - } - - private boolean isTopologyAvailable(TopologyUsage u) { - // for (MonitoredStream stream : u.getMonitoredStream()) { - // if (partition.equals(stream.getStreamParitition())) { - // return false; - // } - // } - if (u == null || u.getLoad() > topoLoadUpbound) { - return false; - } - - return true; - } - - private boolean isBoltAvailable(AlertBoltUsage alertUsage) { - // FIXME : more detail to compare on queue exclusion check - if (alertUsage.getPartitions().stream().filter(partition -> partition.isDedicated()).count() > 0) { - return false; - } - if (!reuseBoltInStreams && alertUsage.getQueueSize() > 0) { - return false; - } - if (reuseBoltInStreams) { - if (alertUsage.getQueueSize() >= streamsPerBolt) { - return false; - } - if (alertUsage.getPartitions().contains(partitionGroup)) { - return false; - } - } - return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java deleted file mode 100644 index 36c0bce..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.model; - -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import java.util.ArrayList; -import java.util.List; - -/** - * @since Mar 28, 2016. - */ -public class AlertBoltUsage { - - private String boltId; - private List<String> policies = new ArrayList<String>(); - // the stream partitions group that scheduled for this given alert bolt - private List<StreamGroup> partitions = new ArrayList<StreamGroup>(); - // the slot queue that scheduled for this given aler bolt - private List<StreamWorkSlotQueue> referQueues = new ArrayList<StreamWorkSlotQueue>(); - private double load; - - public AlertBoltUsage(String anid) { - this.boltId = anid; - } - - public String getBoltId() { - return boltId; - } - - public void setBoltId(String boltId) { - this.boltId = boltId; - } - - public List<String> getPolicies() { - return policies; - } - - public void addPolicies(PolicyDefinition pd) { - policies.add(pd.getName()); - // add first partition - // for (StreamPartition par : pd.getPartitionSpec()) { - // partitions.add(par); - // } - } - - public double getLoad() { - return load; - } - - public void setLoad(double load) { - this.load = load; - } - - public List<StreamGroup> getPartitions() { - return partitions; - } - - public List<StreamWorkSlotQueue> getReferQueues() { - return referQueues; - } - - public int getQueueSize() { - return referQueues.size(); - } - - public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue queue) { - this.referQueues.add(queue); - this.partitions.add(streamPartition); - } - - public void removeQueue(StreamWorkSlotQueue queue) { - this.referQueues.remove(queue); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java deleted file mode 100644 index 39788d5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.model; - -/** - * @since Mar 28, 2016. - */ -public class GroupBoltUsage { - - private String boltId; - private double load; - - public GroupBoltUsage(String boltId) { - this.boltId = boltId; - } - - // private final Set<String> streams = new HashSet<String>(); - // private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>(); - - // private final Map<String, List<StreamPartition>> groupByMeta; - - public double getLoad() { - return load; - } - - public void setLoad(double load) { - this.load = load; - } - - // public Set<String> getStreams() { - // return streams; - // } - // - // - // public Map<String, StreamFilter> getFilters() { - // return filters; - // } - - // public Map<String, List<StreamPartition>> getGroupByMeta() { - // return groupByMeta; - // } - - public String getBoltId() { - return boltId; - } - - public void setBoltId(String boltId) { - this.boltId = boltId; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java deleted file mode 100644 index 3cfc505..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.model; - -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * @since Mar 27, 2016. - */ -public class TopologyUsage { - // topo info - private String topoName; - private final Set<String> datasources = new HashSet<String>(); - // usage information - private final Set<String> policies = new HashSet<String>(); - private final Map<String, AlertBoltUsage> alertUsages = new HashMap<String, AlertBoltUsage>(); - private final Map<String, GroupBoltUsage> groupUsages = new HashMap<String, GroupBoltUsage>(); - private final List<MonitoredStream> monitoredStream = new ArrayList<MonitoredStream>(); - - private double load; - - /** - * This is to be the existing/previous meta-data. <br/> - * Only one group meta-data for all of the group bolts in this topology. - */ - - public TopologyUsage() { - } - - public TopologyUsage(String name) { - this.topoName = name; - } - - public String getTopoName() { - return topoName; - } - - public void setTopoName(String topoId) { - this.topoName = topoId; - } - - public Set<String> getDataSources() { - return datasources; - } - - public Set<String> getPolicies() { - return policies; - } - - public Map<String, AlertBoltUsage> getAlertUsages() { - return alertUsages; - } - - public AlertBoltUsage getAlertBoltUsage(String boltId) { - return alertUsages.get(boltId); - } - - public Map<String, GroupBoltUsage> getGroupUsages() { - return groupUsages; - } - - public List<MonitoredStream> getMonitoredStream() { - return monitoredStream; - } - - public void addMonitoredStream(MonitoredStream par) { - if (!this.monitoredStream.contains(par)) { - this.monitoredStream.add(par); - } - } - - public double getLoad() { - return load; - } - - public void setLoad(double load) { - this.load = load; - } - -}
