SAMZA-982 - Add null check for offset update in OffsetManager

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b3d5c4cc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b3d5c4cc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b3d5c4cc

Branch: refs/heads/master
Commit: b3d5c4cc0d2babedc48015aebbbec6fb99d2a1ce
Parents: 74b0f84
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 13:10:00 2016 -0700
Committer: Navina Ramesh <nram...@linkedin.com>
Committed: Fri Jul 29 13:10:43 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/checkpoint/OffsetManager.scala   | 4 +++-
 .../scala/org/apache/samza/checkpoint/TestOffsetManager.scala    | 2 ++
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index f8033c5..c41eadb 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -186,7 +186,9 @@ class OffsetManager(
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, 
offset: String) {
     lastProcessedOffsets.putIfAbsent(taskName, new 
ConcurrentHashMap[SystemStreamPartition, String]())
-    lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
+    if (offset != null) {
+      lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 75ba8af..cb78223 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -81,6 +81,8 @@ class TestOffsetManager {
     assertEquals("47", offsetManager.getLastProcessedOffset(taskName, 
systemStreamPartition).get)
     // Should never update starting offset.
     assertEquals("46", offsetManager.getStartingOffset(taskName, 
systemStreamPartition).get)
+    // Should not update null offset
+    offsetManager.update(taskName, systemStreamPartition, null)
     offsetManager.checkpoint(taskName)
     val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
     assertEquals(expectedCheckpoint, 
checkpointManager.readLastCheckpoint(taskName))

Reply via email to