Repository: samza
Updated Branches:
  refs/heads/master b6219f181 -> bf2a2f76d


SAMZA-1643: StreamPartitionCountMonitor should only restart/shut down the job 
if partition count increases

As an aside, also update the gauge to report current number of partitions 
instead of the change, since that's what its name indicates.

Author: Prateek Maheshwari <pmahe...@linkedin.com>

Reviewers: Jagadish  V <jagad...@apache.org>

Closes #468 from prateekm/partition-monitor-fixes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf2a2f76
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf2a2f76
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf2a2f76

Branch: refs/heads/master
Commit: bf2a2f76d8632c762a9c337ad6550a56abfb1946
Parents: b6219f1
Author: Prateek Maheshwari <pmahe...@linkedin.com>
Authored: Fri Apr 13 08:37:20 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Apr 13 08:37:20 2018 -0700

----------------------------------------------------------------------
 .../samza/coordinator/StreamPartitionCountMonitor.java      | 9 +++++++--
 .../samza/coordinator/TestStreamPartitionCountMonitor.scala | 8 ++++----
 2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bf2a2f76/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
index 65b266e..7d76c8c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -184,11 +184,16 @@ public class StreamPartitionCountMonitor {
           int prevPartitionCount = 
metadata.getSystemStreamPartitionMetadata().size();
 
           Gauge gauge = gauges.get(systemStream);
-          gauge.set(currentPartitionCount - prevPartitionCount);
+          gauge.set(currentPartitionCount);
           if (currentPartitionCount != prevPartitionCount) {
             log.warn(String.format("Change of partition count detected in 
stream %s. old partition count: %d, current partition count: %d",
                 systemStream.toString(), prevPartitionCount, 
currentPartitionCount));
-            streamsChanged.add(systemStream);
+            if (currentPartitionCount  > prevPartitionCount) {
+              log.error(String.format("Shutting down (stateful) or restarting 
(stateless) the job since current " +
+                      "partition count %d is greater than the old partition 
count %d for stream %s.",
+                  currentPartitionCount, prevPartitionCount, 
systemStream.toString()));
+              streamsChanged.add(systemStream);
+            }
           }
         } catch (Exception e) {
           log.error(String.format("Error comparing partition count differences 
for stream: %s", metadataEntry.getKey().toString()));

http://git-wip-us.apache.org/repos/asf/samza/blob/bf2a2f76/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index b66cd64..399543c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -83,13 +83,13 @@ class TestStreamPartitionCountMonitor extends 
AssertionsForJUnit with MockitoSug
     partitionCountMonitor.updatePartitionCountMetric()
 
     assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream))
-    assertEquals(1, 
partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
+    assertEquals(3, 
partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
 
     assertNotNull(metrics.getGroup("job-coordinator"))
 
     val metricGroup = metrics.getGroup("job-coordinator")
     
assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
-    assertEquals(1, 
metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+    assertEquals(3, 
metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
 
     verify(mockCallback, times(1)).onSystemStreamPartitionChange(any())
 
@@ -148,13 +148,13 @@ class TestStreamPartitionCountMonitor extends 
AssertionsForJUnit with MockitoSug
     partitionCountMonitor.updatePartitionCountMetric()
 
     assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream))
-    assertEquals(1, 
partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
+    assertEquals(3, 
partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
 
     assertNotNull(metrics.getGroup("job-coordinator"))
 
     val metricGroup = metrics.getGroup("job-coordinator")
     
assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
-    assertEquals(1, 
metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+    assertEquals(3, 
metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
 
     // Make sure as long as one of the input stream topic partition change is 
detected, the callback is invoked
     verify(mockCallback, times(1)).onSystemStreamPartitionChange(any())

Reply via email to