SAMZA-982 - Add null check for offset update in OffsetManager
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b3d5c4cc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b3d5c4cc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b3d5c4cc Branch: refs/heads/master Commit: b3d5c4cc0d2babedc48015aebbbec6fb99d2a1ce Parents: 74b0f84 Author: Xinyu Liu <xi...@linkedin.com> Authored: Fri Jul 29 13:10:00 2016 -0700 Committer: Navina Ramesh <nram...@linkedin.com> Committed: Fri Jul 29 13:10:43 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/checkpoint/OffsetManager.scala | 4 +++- .../scala/org/apache/samza/checkpoint/TestOffsetManager.scala | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index f8033c5..c41eadb 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -186,7 +186,9 @@ class OffsetManager( */ def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) { lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]()) - lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset) + if (offset != null) { + lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset) + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index 75ba8af..cb78223 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -81,6 +81,8 @@ class TestOffsetManager { assertEquals("47", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get) // Should never update starting offset. assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get) + // Should not update null offset + offsetManager.update(taskName, systemStreamPartition, null) offsetManager.checkpoint(taskName) val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47")) assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))