This is an automated email from the ASF dual-hosted git repository. lhaiesp pushed a commit to branch 1.3.1 in repository https://gitbox.apache.org/repos/asf/samza.git
commit 964252aed6fb16b7952b93e56c3a0cd8f1072861 Author: bkonold <[email protected]> AuthorDate: Tue Jan 28 15:43:59 2020 -0800 SAMZA-2446: Invoke onCheckpoint only for registered SSPs (#1260) --- .../org/apache/samza/checkpoint/OffsetManager.scala | 17 +++++++++++------ .../org/apache/samza/checkpoint/TestOffsetManager.scala | 10 +++++++++- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 33fca8f..442d83f 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -353,13 +353,18 @@ class OffsetManager( } } - // invoke checkpoint listeners - checkpoint.getOffsets.asScala.groupBy { case (ssp, _) => ssp.getSystem }.foreach { - case (systemName:String, offsets: Map[SystemStreamPartition, String]) => { - // Option is empty if there is no checkpointListener for this systemName - checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava)) + // Invoke checkpoint listeners only for SSPs that are registered with the OffsetManager. For example, + // changelog SSPs are not registered but may be present in the Checkpoint if transactional state checkpointing + // is enabled. + val registeredSSPs = systemStreamPartitions.getOrElse(taskName, Set[SystemStreamPartition]()) + checkpoint.getOffsets.asScala + .filterKeys(registeredSSPs.contains) + .groupBy { case (ssp, _) => ssp.getSystem }.foreach { + case (systemName:String, offsets: Map[SystemStreamPartition, String]) => { + // Option is empty if there is no checkpointListener for this systemName + checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava)) + } } - } } // delete corresponding startpoints after checkpoint is supposed to be committed diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index 50c793c..677504d 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -388,9 +388,11 @@ class TestOffsetManager { val systemName2 = "test-system2" val systemStream = new SystemStream(systemName, "test-stream") val systemStream2 = new SystemStream(systemName2, "test-stream2") + val systemStream3 = new SystemStream(systemName, "test-stream3") val partition = new Partition(0) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val systemStreamPartition2 = new SystemStreamPartition(systemStream2, partition) + val unregisteredSystemStreamPartition = new SystemStreamPartition(systemStream3, partition) val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava) val testStreamMetadata2 = new SystemStreamMetadata(systemStream2.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava) val systemStreamMetadata = Map(systemStream -> testStreamMetadata, systemStream2->testStreamMetadata2) @@ -420,7 +422,10 @@ class TestOffsetManager { offsetManager.start // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset. assertTrue(startpointManagerUtil.getStartpointManager.getFanOutForTask(taskName).containsKey(systemStreamPartition)) - checkpoint(offsetManager, taskName) + val offsetsToCheckpoint = new java.util.HashMap[SystemStreamPartition, String]() + offsetsToCheckpoint.putAll(offsetManager.buildCheckpoint(taskName).getOffsets) + offsetsToCheckpoint.put(unregisteredSystemStreamPartition, "50") + offsetManager.writeCheckpoint(taskName, new Checkpoint(offsetsToCheckpoint)) intercept[IllegalStateException] { // StartpointManager should stop after last fan out is removed @@ -434,6 +439,9 @@ class TestOffsetManager { assertEquals("45", consumer.recentCheckpoint.get(systemStreamPartition)) // make sure only the system with the callbacks gets them assertNull(consumer.recentCheckpoint.get(systemStreamPartition2)) + // even though systemStream and systemStream3 share the same checkpointListener, callback should not execute for + // systemStream3 since it is not registered with the OffsetManager + assertNull(consumer.recentCheckpoint.get(unregisteredSystemStreamPartition)) offsetManager.update(taskName, systemStreamPartition, "46") offsetManager.update(taskName, systemStreamPartition, "47")
