SAMZA-568; allow tasks to override offsets in init method
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/537262e2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/537262e2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/537262e2 Branch: refs/heads/samza-sql Commit: 537262e284d6c126ea434c1af74e66e92cf08e5e Parents: cad2c82 Author: Ben Kirwin <[email protected]> Authored: Mon Mar 16 16:05:28 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Mar 16 16:05:28 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/task/TaskContext.java | 10 ++++ .../apache/samza/container/TaskInstance.scala | 4 ++ .../samza/container/TestTaskInstance.scala | 57 +++++++++++++++++--- 3 files changed, 65 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 6d10212..929409e 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -37,4 +37,14 @@ public interface TaskContext { Object getStore(String name); TaskName getTaskName(); + + /** + * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets + * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task + * (as returned by {@link #getSystemStreamPartitions()}); trying to set the offset for any other partition + * will have no effect. + * + * NOTE: this feature is experimental, and the API may change in a future release. + */ + void setStartingOffset(SystemStreamPartition ssp, String offset); } http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index a583ff9..be0b55a 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -65,6 +65,10 @@ class TaskInstance( null } def getTaskName = taskName + + override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = { + offsetManager.startingOffsets += (ssp -> offset) + } } def registerMetrics { http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 11eab16..54b4df8 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -40,12 +40,7 @@ import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.chooser.RoundRobinChooser -import org.apache.samza.task.MessageCollector -import org.apache.samza.task.ReadableCoordinator -import org.apache.samza.task.StreamTask -import org.apache.samza.task.TaskCoordinator -import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.task.WindowableTask +import org.apache.samza.task._ import org.junit.Assert._ import org.junit.Test import org.scalatest.Assertions.intercept @@ -240,4 +235,54 @@ class TestTaskInstance { assertEquals(2L, getCount(group, classOf[NonFatalException].getName)) assertEquals(1L, getCount(group, classOf[FatalException].getName)) } + + + /** + * Tests that the init() method of task can override the existing offset + * assignment. + */ + @Test + def testManualOffsetReset { + + val partition0 = new SystemStreamPartition("system", "stream", new Partition(0)) + val partition1 = new SystemStreamPartition("system", "stream", new Partition(1)) + + val task = new StreamTask with InitableTask { + + override def init(config: Config, context: TaskContext): Unit = { + + assertTrue("Can only update offsets for assigned partition", + context.getSystemStreamPartitions.contains(partition1) + ) + + context.setStartingOffset(partition1, "10") + } + + override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {} + } + + val chooser = new RoundRobinChooser() + val consumers = new SystemConsumers(chooser, consumers = Map.empty) + val producers = new SystemProducers(Map.empty, new SerdeManager()) + val collector = new TaskInstanceCollector(producers) + + val offsetManager = new OffsetManager() + + offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0") + + val taskInstance = new TaskInstance( + task = task, + taskName = new TaskName("Offset Reset Task 0"), + config = new MapConfig(), + metrics = new TaskInstanceMetrics(), + consumerMultiplexer = consumers, + collector = collector, + offsetManager = offsetManager, + systemStreamPartitions = Set(partition0, partition1) ) + + taskInstance.initTask + + assertEquals(Some("0"), offsetManager.getStartingOffset(partition0)) + assertEquals(Some("10"), offsetManager.getStartingOffset(partition1)) + } }
