Repository: incubator-samza
Updated Branches:
  refs/heads/master a8bbf251c -> 2283fd236


SAMZA-461; fix race condition when starting job with an empty stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2283fd23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2283fd23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2283fd23

Branch: refs/heads/master
Commit: 2283fd236b4d4e885e39b70d5c07b972d164de27
Parents: a8bbf25
Author: Ben Kirwin <[email protected]>
Authored: Fri Nov 21 10:32:18 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Fri Nov 21 10:32:18 2014 -0800

----------------------------------------------------------------------
 .../org/apache/samza/checkpoint/OffsetManager.scala    |  9 ++++++++-
 .../apache/samza/checkpoint/TestOffsetManager.scala    | 13 +++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2283fd23/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 80c8d0e..a40c87f 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -343,7 +343,14 @@ class OffsetManager(
           .get(partition)
 
         if (systemStreamPartitionMetadata != null) {
-          val nextOffset = systemStreamPartitionMetadata.getOffset(offsetType)
+          val nextOffset = {
+            val requested = systemStreamPartitionMetadata.getOffset(offsetType)
+
+            if (requested == null) {
+              warn("Requested offset type %s in %s, but the stream is empty. 
Defaulting to the upcoming offset." format (offsetType, systemStreamPartition))
+              systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING)
+            } else requested
+          }
 
           debug("Got next default offset %s for %s" format (nextOffset, 
systemStreamPartition))
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2283fd23/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index a79ecca..35e7f6b 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -216,6 +216,19 @@ class TestOffsetManager {
     
assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null))
   }
 
+  @Test
+  def testDefaultToUpcomingOnMissingDefault {
+    val taskName = new TaskName("task-name")
+    val ssp = new SystemStreamPartition(new SystemStream("test-system", 
"test-stream"), new Partition(0))
+    val sspm = new SystemStreamPartitionMetadata(null, null, "13")
+    val offsetMeta = new SystemStreamMetadata("test-stream", Map(new 
Partition(0) -> sspm))
+    val settings = new OffsetSetting(offsetMeta, OffsetType.OLDEST, 
resetOffset = false)
+    val offsetManager = new OffsetManager(offsetSettings = 
Map(ssp.getSystemStream -> settings))
+    offsetManager.register(taskName, Set(ssp))
+    offsetManager.start
+    assertEquals(Some("13"), offsetManager.getStartingOffset(ssp))
+  }
+
   private def getCheckpointManager(systemStreamPartition: 
SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
     val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
 

Reply via email to