Repository: eagle Updated Branches: refs/heads/master a64b622cc -> c01a258b6
[EAGLE-971] fix a bug that duplicated queues are generated under a monitored stream https://issues.apache.org/jira/browse/EAGLE-971 New policies for alert spec generation 1. each alert bolt has no more than 'coordinator.policiesPerBolt' policies. 2. each alert bolt has no more than 'coordinator.streamsPerBolt' queues if 'reuseBoltInStreams' is true 3. NO queues on one alert bolt have the same StreamGroup. Author: Zhao, Qingwen <[email protected]> Closes #895 from qingwen220/EAGLE-971. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/c01a258b Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/c01a258b Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/c01a258b Branch: refs/heads/master Commit: c01a258b6f9b8fa0ca340b605e7ef942b95c3afd Parents: a64b622 Author: Zhao, Qingwen <[email protected]> Authored: Wed Mar 29 14:48:09 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Wed Mar 29 14:48:09 2017 +0800 ---------------------------------------------------------------------- eagle-assembly/src/main/conf/eagle.conf | 6 ++++-- .../alert/coordinator/TopologyMgmtService.java | 2 +- .../impl/strategies/SameTopologySlotStrategy.java | 18 +++++++++++------- .../alert/engine/spout/CorrelationSpout.java | 7 ++++++- 4 files changed, 22 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-assembly/src/main/conf/eagle.conf ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/conf/eagle.conf b/eagle-assembly/src/main/conf/eagle.conf index 2912682..496e3d0 100644 --- a/eagle-assembly/src/main/conf/eagle.conf +++ b/eagle-assembly/src/main/conf/eagle.conf @@ -142,12 +142,14 @@ application { # Coordinator Configuration coordinator { - policiesPerBolt = 5 - boltParallelism = 5 +# boltParallelism = 5 policyDefaultParallelism = 5 boltLoadUpbound = 0.8 topologyLoadUpbound = 0.8 numOfAlertBoltsPerTopology = 5 + policiesPerBolt = 10 + streamsPerBolt = 10 + reuseBoltInStreams = true zkConfig { zkQuorum = "server.eagle.apache.org:2181" zkRoot = "/alert" http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java index 4ca9d5e..4ede29d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java @@ -56,7 +56,7 @@ public class TopologyMgmtService { public TopologyMgmtService() { Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR); - boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM); + //boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM); numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY); } http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java index 0e5fd00..e401e98 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java @@ -29,6 +29,7 @@ import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; import org.apache.eagle.alert.coordinator.model.TopologyUsage; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy { private final StreamGroup partitionGroup; private final TopologyMgmtService mgmtService; - // private final int numOfPoliciesBoundPerBolt; + private final int numOfPoliciesBoundPerBolt; private final double topoLoadUpbound; private final boolean reuseBoltInStreams; private final int streamsPerBolt; @@ -62,7 +63,7 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy { this.mgmtService = mgmtService; Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR); - // numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT); + numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT); topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND); if (config.hasPath(CoordinatorConstants.REUSE_BOLT_IN_STREAMS)) { reuseBoltInStreams = config.getBoolean(CoordinatorConstants.REUSE_BOLT_IN_STREAMS); @@ -162,12 +163,15 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy { if (!reuseBoltInStreams && alertUsage.getQueueSize() > 0) { return false; } - if (reuseBoltInStreams && alertUsage.getQueueSize() >= streamsPerBolt) { - return false; + if (reuseBoltInStreams) { + if (alertUsage.getQueueSize() >= streamsPerBolt) { + return false; + } + if (alertUsage.getPartitions().contains(partitionGroup)) { + return false; + } } - // actually it's now 0; - return true; - // return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt; + return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java index 60a9b98..e9ee892 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java @@ -168,7 +168,12 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener @Override public void nextTuple() { for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) { - wrapper.nextTuple(); + try { + wrapper.nextTuple(); + } catch (Exception e) { + LOG.error("unexpected exception is caught: {}", e.getMessage(), e); + } + } }
