Repository: incubator-eagle
Updated Branches:
  refs/heads/master ab1c9b64f -> c4a0b94d7


[EAGLE-694] alert engine could not reduce alert bolt number when parallelism of 
p…

Author: wujinhu <wujinhu...@126.com>

Closes #577 from wujinhu/EAGLE-694.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c4a0b94d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c4a0b94d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c4a0b94d

Branch: refs/heads/master
Commit: c4a0b94d760b54b3613e11d67e355e982a18910f
Parents: ab1c9b6
Author: wujinhu <wujinhu...@126.com>
Authored: Fri Oct 28 14:54:51 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Fri Oct 28 14:54:51 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/PolicyDefinition.java     |  2 +-
 .../coordinator/provider/ScheduleContextBuilder.java   |  4 ++--
 .../alert/coordinator/ScheduleContextBuilderTest.java  | 13 +++++++------
 3 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 6df682a..cfd7fef 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -153,7 +153,7 @@ public class PolicyDefinition implements Serializable {
             && (another.definition != null && 
another.definition.equals(this.definition))
             && Objects.equals(this.definition, another.definition)
             && CollectionUtils.isEqualCollection(another.partitionSpec, 
this.partitionSpec)
-            // && another.parallelismHint == this.parallelismHint
+             && another.parallelismHint == this.parallelismHint
             ) {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index 69225da..98b598a 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -224,8 +224,8 @@ public class ScheduleContextBuilder {
             } else {
                 StreamWorkSlotQueue queue = 
queueMap.get(assignment.getQueueId());
                 if (queue == null
-                    || 
policies.get(assignment.getPolicyName()).getParallelismHint() > 
queue.getQueueSize()) {
-                    // queue not found or policy has hint bigger than queue 
(possible a poilcy update)
+                    || 
policies.get(assignment.getPolicyName()).getParallelismHint() != 
queue.getQueueSize()) {
+                    // queue not found or policy has hint not equal to queue 
(possible a poilcy update)
                     LOG.info("Policy assignment {} 's policy doesnt match 
queue: {}!", assignment, queue);
                     paIt.remove();
                 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
index ac83c73..e7efbd7 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
@@ -157,14 +157,15 @@ public class ScheduleContextBuilderTest {
         StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, 
assignment1.getQueueId()).getRight();
 
         PolicyDefinition pd1 = client.listPolicies().get(0);
-        pd1.setParallelismHint(4); // default queue is 5 , change to smaller, 
has no effect
+        pd1.setParallelismHint(4); // default queue is 5 , change to smaller, 
same like change bigger
 
         context = builder.buildContext();
-        PolicyAssignment assignmentNew = 
context.getPolicyAssignments().values().iterator().next();
-        StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, 
assignmentNew.getQueueId()).getRight();
-        Assert.assertNotNull(queueNew);
+        
Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext());
+        //PolicyAssignment assignmentNew = 
context.getPolicyAssignments().values().iterator().next();
+        //StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, 
assignmentNew.getQueueId()).getRight();
+        //Assert.assertNotNull(queueNew);
         // just to make sure queueNew is present
-        Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
+        //Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
 
         // default queue is 5 , change to bigger 6, policy assignment removed
         pd1.setParallelismHint(queue.getQueueSize() + 1);
@@ -333,7 +334,7 @@ public class ScheduleContextBuilderTest {
         slots.add(slot2);
         slots.add(slot3);
         slots.add(slot4);
-        slots.add(slot5);
+        //slots.add(slot5);
 
         StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, 
new HashMap<>(), slots);
         ms.addQueues(q);

Reply via email to