Repository: samza Updated Branches: refs/heads/master 49e5073c3 -> 3f9b96704
SAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compare the offsets Author: Aditya Toomula <atoom...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #443 from atoomula/bootstrap Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3f9b9670 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3f9b9670 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3f9b9670 Branch: refs/heads/master Commit: 3f9b967041e4108c0071033a73f2595e56ed3e0b Parents: 49e5073 Author: Aditya Toomula <atoom...@linkedin.com> Authored: Tue Mar 13 17:43:45 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Mar 13 17:43:45 2018 -0700 ---------------------------------------------------------------------- .../samza/system/chooser/BootstrappingChooser.scala | 12 ++++++++++-- .../samza/system/chooser/TestBootstrappingChooser.scala | 6 +++--- .../samza/system/chooser/TestDefaultChooser.scala | 10 +++++----- 3 files changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala index 212ec05..ffc0c90 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala @@ -266,9 +266,17 @@ class BootstrappingChooser( trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition)) - // The SSP is no longer lagging if the envelope's offset equals the + // Let's compare offset of the chosen message with offsetToCheck. + val comparatorResult: Integer = if (offset == null || offsetToCheck == null) { + -1 + } else { + val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem) + systemAdmin.offsetComparator(offset, offsetToCheck) + } + + // The SSP is no longer lagging if the envelope's offset is greater than or equal to the // latest offset. - if (offset != null && offset.equals(offsetToCheck)) { + if (comparatorResult != null && comparatorResult.intValue() >= 0) { laggingSystemStreamPartitions -= systemStreamPartition systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1) http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala index e56206a..02791bb 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala @@ -36,9 +36,9 @@ import scala.collection.JavaConverters._ @RunWith(value = classOf[Parameterized]) class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) { - val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); - val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2); - val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3); + val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1); + val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2); + val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3); val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4); /** http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala index df5282c..e21bd9c 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala @@ -32,13 +32,13 @@ import org.junit.Test import scala.collection.JavaConverters._ class TestDefaultChooser { - val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); - val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2); - val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), null, null, 3); + val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1); + val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2); + val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), "122", null, 3); val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4); - val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 5); + val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "320", null, 5); val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "321", null, 6); - val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 7); + val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "653", null, 7); val envelope8 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream3", new Partition(0)), "654", null, 8); @Test