Repository: samza
Updated Branches:
  refs/heads/master 091caef99 -> 0937c1dc0


http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 7a16099..31ff9cf 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -20,14 +20,17 @@
 package org.apache.samza.checkpoint
 
 import java.util
+
+import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
-import org.apache.samza.Partition
-import org.apache.samza.system._
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory
+import org.apache.samza.startpoint.{StartpointManager, StartpointOldest, 
StartpointUpcoming}
 import org.apache.samza.system.SystemStreamMetadata.{OffsetType, 
SystemStreamPartitionMetadata}
+import org.apache.samza.system._
+import org.apache.samza.util.NoOpMetricsRegistry
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.SamzaException
-import org.apache.samza.config.MapConfig
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.Assertions.intercept
 
@@ -61,10 +64,13 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
+    val startpointManager = getStartpointManager()
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, getStartpointManager(), systemAdmins, Map(), new 
OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(1, checkpointManager.registered.size)
@@ -82,9 +88,41 @@ class TestOffsetManager {
     // Should not update null offset
     offsetManager.update(taskName, systemStreamPartition, null)
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should delete after checkpoint commit
     val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> 
"47").asJava)
     assertEquals(expectedCheckpoint, 
checkpointManager.readLastCheckpoint(taskName))
   }
+  @Test
+  def testGetAndSetStartpoint {
+    val taskName1 = new TaskName("c")
+    val taskName2 = new TaskName("d")
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val config = new MapConfig
+    val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName1)
+    val startpointManager = getStartpointManager()
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, getStartpointManager(), systemAdmins, Map(), new 
OffsetManagerMetrics)
+
+    offsetManager.register(taskName1, Set(systemStreamPartition))
+    val startpoint1 = new StartpointOldest
+    startpointManager.writeStartpoint(systemStreamPartition, taskName1, 
startpoint1)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName1))
+    offsetManager.start
+    val startpoint2 = new StartpointUpcoming
+    offsetManager.setStartpoint(taskName2, systemStreamPartition, startpoint2)
+
+    assertEquals(Option(startpoint1), offsetManager.getStartpoint(taskName1, 
systemStreamPartition))
+    assertEquals(Option(startpoint2), offsetManager.getStartpoint(taskName2, 
systemStreamPartition))
+
+    assertEquals(startpoint1, 
startpointManager.readStartpoint(systemStreamPartition, taskName1))
+    // Startpoint written to offset manager, but not directly to startpoint 
manager.
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName2))
+  }
 
   @Test
   def testGetCheckpointedOffsetMetric{
@@ -96,20 +134,33 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
+    val startpointManager = getStartpointManager()
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, getStartpointManager(), systemAdmins, Map(), new 
OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
+
+    // Pre-populate startpoint
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last 
processed, and system admin should return 46 as starting offset.
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should delete after checkpoint commit
     assertEquals("45", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     offsetManager.update(taskName, systemStreamPartition, "46")
+
     offsetManager.update(taskName, systemStreamPartition, "47")
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("47", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+
     offsetManager.update(taskName, systemStreamPartition, "48")
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("48", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
   }
 
@@ -153,18 +204,22 @@ class TestOffsetManager {
     val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45").asJava)
     // Checkpoint manager only has partition 1.
     val checkpointManager = getCheckpointManager(systemStreamPartition1, 
taskName1)
+    val startpointManager = getStartpointManager()
     val config = new MapConfig
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, getStartpointManager(), systemAdmins)
     // Register both partitions. Partition 2 shouldn't have a checkpoint.
     offsetManager.register(taskName1, Set(systemStreamPartition1))
     offsetManager.register(taskName2, Set(systemStreamPartition2))
+    startpointManager.writeStartpoint(systemStreamPartition1, taskName1, new 
StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition1, 
taskName1))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(2, checkpointManager.registered.size)
     assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName1))
     assertNull(checkpointManager.readLastCheckpoint(taskName2))
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition1, 
taskName1)) // no checkpoint commit so this should still be there
   }
 
   @Test
@@ -256,6 +311,7 @@ class TestOffsetManager {
     val checkpointManager = getCheckpointManager1(systemStreamPartition,
                                                  new 
Checkpoint(Map(systemStreamPartition -> "45", systemStreamPartition2 -> 
"100").asJava),
                                                  taskName)
+    val startpointManager = getStartpointManager()
     val consumer = new SystemConsumerWithCheckpointCallback
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin(systemName)).thenReturn(getSystemAdmin)
@@ -266,13 +322,16 @@ class TestOffsetManager {
     else
       Map()
 
-    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins,
+    val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, getStartpointManager(), systemAdmins,
       checkpointListeners, new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition, 
systemStreamPartition2))
 
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last 
processed, and system admin should return 46 as starting offset.
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint be deleted at first checkpoint
     assertEquals("45", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("100", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("45", consumer.recentCheckpoint.get(systemStreamPartition))
@@ -281,7 +340,10 @@ class TestOffsetManager {
 
     offsetManager.update(taskName, systemStreamPartition, "46")
     offsetManager.update(taskName, systemStreamPartition, "47")
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("47", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("100", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("47", consumer.recentCheckpoint.get(systemStreamPartition))
@@ -289,7 +351,10 @@ class TestOffsetManager {
 
     offsetManager.update(taskName, systemStreamPartition, "48")
     offsetManager.update(taskName, systemStreamPartition2, "101")
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("48", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("101", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("48", consumer.recentCheckpoint.get(systemStreamPartition))
@@ -312,27 +377,35 @@ class TestOffsetManager {
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
+    val startpointManager = getStartpointManager()
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, 
checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, 
checkpointManager, getStartpointManager(), systemAdmins, Map(), new 
OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
     offsetManager.start
 
     // Should get offset 45 back from the checkpoint manager, which is last 
processed, and system admin should return 46 as starting offset.
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName))
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint be deleted at first checkpoint
     assertEquals("45", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
 
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new 
StartpointOldest)
+
     offsetManager.update(taskName, systemStreamPartition, "46")
     // Get checkpoint snapshot like we do at the beginning of 
TaskInstance.commit()
     val checkpoint46 = offsetManager.buildCheckpoint(taskName)
     offsetManager.update(taskName, systemStreamPartition, "47") // Offset 
updated before checkpoint
     offsetManager.writeCheckpoint(taskName, checkpoint46)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, 
systemStreamPartition))
     assertEquals("46", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
 
     // Now write the checkpoint for the latest offset
     val checkpoint47 = offsetManager.buildCheckpoint(taskName)
     offsetManager.writeCheckpoint(taskName, checkpoint47)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, 
taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, 
systemStreamPartition))
     assertEquals("47", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
   }
@@ -380,6 +453,12 @@ class TestOffsetManager {
     }
   }
 
+  private def getStartpointManager() = {
+    val startpointManager = new StartpointManager(new 
InMemoryMetadataStoreFactory, new MapConfig, new NoOpMetricsRegistry)
+    startpointManager.start
+    startpointManager
+  }
+
   private def getSystemAdmin: SystemAdmin = {
     new SystemAdmin {
       def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, 
String]) =

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index e1ca2e6..bfea961 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -45,6 +45,7 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
 import org.apache.samza.operators.KV;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
@@ -106,6 +107,7 @@ public class TestRunner {
     configs.put(JobConfig.JOB_NAME(), JOB_NAME);
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY(), 
InMemoryMetadataStoreFactory.class.getCanonicalName());
     configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
     // Changing the base directory for non-changelog stores used by Samza 
application to separate the
     // on-disk store locations for concurrently executing tests

Reply via email to