Repository: incubator-eagle Updated Branches: refs/heads/master ab1c9b64f -> c4a0b94d7
[EAGLE-694] alert engine could not reduce alert bolt number when parallelism of p⦠Author: wujinhu <wujinhu...@126.com> Closes #577 from wujinhu/EAGLE-694. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c4a0b94d Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c4a0b94d Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c4a0b94d Branch: refs/heads/master Commit: c4a0b94d760b54b3613e11d67e355e982a18910f Parents: ab1c9b6 Author: wujinhu <wujinhu...@126.com> Authored: Fri Oct 28 14:54:51 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Fri Oct 28 14:54:51 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/coordinator/PolicyDefinition.java | 2 +- .../coordinator/provider/ScheduleContextBuilder.java | 4 ++-- .../alert/coordinator/ScheduleContextBuilderTest.java | 13 +++++++------ 3 files changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index 6df682a..cfd7fef 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -153,7 +153,7 @@ public class PolicyDefinition implements Serializable { && (another.definition != null && another.definition.equals(this.definition)) && Objects.equals(this.definition, another.definition) && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) - // && another.parallelismHint == this.parallelismHint + && another.parallelismHint == this.parallelismHint ) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java index 69225da..98b598a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java @@ -224,8 +224,8 @@ public class ScheduleContextBuilder { } else { StreamWorkSlotQueue queue = queueMap.get(assignment.getQueueId()); if (queue == null - || policies.get(assignment.getPolicyName()).getParallelismHint() > queue.getQueueSize()) { - // queue not found or policy has hint bigger than queue (possible a poilcy update) + || policies.get(assignment.getPolicyName()).getParallelismHint() != queue.getQueueSize()) { + // queue not found or policy has hint not equal to queue (possible a poilcy update) LOG.info("Policy assignment {} 's policy doesnt match queue: {}!", assignment, queue); paIt.remove(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java index ac83c73..e7efbd7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java @@ -157,14 +157,15 @@ public class ScheduleContextBuilderTest { StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); PolicyDefinition pd1 = client.listPolicies().get(0); - pd1.setParallelismHint(4); // default queue is 5 , change to smaller, has no effect + pd1.setParallelismHint(4); // default queue is 5 , change to smaller, same like change bigger context = builder.buildContext(); - PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); - StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); - Assert.assertNotNull(queueNew); + Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext()); + //PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); + //StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); + //Assert.assertNotNull(queueNew); // just to make sure queueNew is present - Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); + //Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); // default queue is 5 , change to bigger 6, policy assignment removed pd1.setParallelismHint(queue.getQueueSize() + 1); @@ -333,7 +334,7 @@ public class ScheduleContextBuilderTest { slots.add(slot2); slots.add(slot3); slots.add(slot4); - slots.add(slot5); + //slots.add(slot5); StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots); ms.addQueues(q);