Repository: incubator-samza Updated Branches: refs/heads/master 95cee714e -> 7929e47c2
SAMZA-267; offset manager should not fail if task.inputs is changed Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7929e47c Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7929e47c Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7929e47c Branch: refs/heads/master Commit: 7929e47c27d480d9987357d5532b10d983b7ee32 Parents: 95cee71 Author: Chris Riccomini <[email protected]> Authored: Thu May 29 08:53:02 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu May 29 08:53:02 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/checkpoint/OffsetManager.scala | 11 ++++++++++- .../samza/checkpoint/TestOffsetManager.scala | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7929e47c/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index a8333db..9487b58 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -253,7 +253,16 @@ class OffsetManager( checkpointManager.start - lastProcessedOffsets ++= getPartitions.flatMap(restoreOffsetsFromCheckpoint(_)) + lastProcessedOffsets ++= getPartitions + .flatMap(restoreOffsetsFromCheckpoint(_)) + .filter { + case (systemStreamPartition, offset) => + val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream) + if (!shouldKeep) { + info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition)) + } + shouldKeep + } } else { debug("Skipping offset load from checkpoint manager because no manager was defined.") } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7929e47c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index e021327..552f8c2 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -187,6 +187,25 @@ class TestOffsetManager { } } + @Test + def testOutdatedStreamInCheckpoint { + val systemStream0 = new SystemStream("test-system-0", "test-stream") + val systemStream1 = new SystemStream("test-system-1", "test-stream") + val partition0 = new Partition(0) + val systemStreamPartition0 = new SystemStreamPartition(systemStream0, partition0) + val systemStreamPartition1 = new SystemStreamPartition(systemStream1, partition0) + val testStreamMetadata = new SystemStreamMetadata(systemStream0.getStream, Map(partition0 -> new SystemStreamPartitionMetadata("0", "1", "2"))) + val systemStreamMetadata = Map(systemStream0 -> testStreamMetadata) + val offsetSettings = Map(systemStream0 -> OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false)) + val checkpointManager = getCheckpointManager(systemStreamPartition1) + val offsetManager = new OffsetManager(offsetSettings, checkpointManager) + offsetManager.register(systemStreamPartition0) + offsetManager.start + assertTrue(checkpointManager.isStarted) + assertEquals(1, checkpointManager.registered.size) + assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null)) + } + private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = { val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45"))
