Repository: samza Updated Branches: refs/heads/master b6219f181 -> bf2a2f76d
SAMZA-1643: StreamPartitionCountMonitor should only restart/shut down the job if partition count increases As an aside, also update the gauge to report current number of partitions instead of the change, since that's what its name indicates. Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jagadish V <jagad...@apache.org> Closes #468 from prateekm/partition-monitor-fixes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf2a2f76 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf2a2f76 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf2a2f76 Branch: refs/heads/master Commit: bf2a2f76d8632c762a9c337ad6550a56abfb1946 Parents: b6219f1 Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Fri Apr 13 08:37:20 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Apr 13 08:37:20 2018 -0700 ---------------------------------------------------------------------- .../samza/coordinator/StreamPartitionCountMonitor.java | 9 +++++++-- .../samza/coordinator/TestStreamPartitionCountMonitor.scala | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/bf2a2f76/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java index 65b266e..7d76c8c 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java @@ -184,11 +184,16 @@ public class StreamPartitionCountMonitor { int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().size(); Gauge gauge = gauges.get(systemStream); - gauge.set(currentPartitionCount - prevPartitionCount); + gauge.set(currentPartitionCount); if (currentPartitionCount != prevPartitionCount) { log.warn(String.format("Change of partition count detected in stream %s. old partition count: %d, current partition count: %d", systemStream.toString(), prevPartitionCount, currentPartitionCount)); - streamsChanged.add(systemStream); + if (currentPartitionCount > prevPartitionCount) { + log.error(String.format("Shutting down (stateful) or restarting (stateless) the job since current " + + "partition count %d is greater than the old partition count %d for stream %s.", + currentPartitionCount, prevPartitionCount, systemStream.toString())); + streamsChanged.add(systemStream); + } } } catch (Exception e) { log.error(String.format("Error comparing partition count differences for stream: %s", metadataEntry.getKey().toString())); http://git-wip-us.apache.org/repos/asf/samza/blob/bf2a2f76/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala index b66cd64..399543c 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala @@ -83,13 +83,13 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug partitionCountMonitor.updatePartitionCountMetric() assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream)) - assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue) + assertEquals(3, partitionCountMonitor.getGauges().get(inputSystemStream).getValue) assertNotNull(metrics.getGroup("job-coordinator")) val metricGroup = metrics.getGroup("job-coordinator") assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]]) - assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue) + assertEquals(3, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue) verify(mockCallback, times(1)).onSystemStreamPartitionChange(any()) @@ -148,13 +148,13 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug partitionCountMonitor.updatePartitionCountMetric() assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream)) - assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue) + assertEquals(3, partitionCountMonitor.getGauges().get(inputSystemStream).getValue) assertNotNull(metrics.getGroup("job-coordinator")) val metricGroup = metrics.getGroup("job-coordinator") assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]]) - assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue) + assertEquals(3, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue) // Make sure as long as one of the input stream topic partition change is detected, the callback is invoked verify(mockCallback, times(1)).onSystemStreamPartitionChange(any())