SAMZA-588; expose SamzaContainerContext through TaskContext
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2c9020b7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2c9020b7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2c9020b7 Branch: refs/heads/samza-sql Commit: 2c9020b7f20d609113868c224fb8138c36521e0e Parents: 91289f4 Author: Benjamin Fradet <[email protected]> Authored: Tue Mar 24 12:47:00 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Mar 24 12:47:00 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/task/TaskContext.java | 3 +++ .../apache/samza/container/SamzaContainer.scala | 3 ++- .../apache/samza/container/TaskInstance.scala | 2 ++ .../samza/container/TestSamzaContainer.scala | 5 +++- .../samza/container/TestTaskInstance.scala | 25 ++++++++++++++------ 5 files changed, 29 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 929409e..5b337a6 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -19,6 +19,7 @@ package org.apache.samza.task; +import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.TaskName; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemStreamPartition; @@ -38,6 +39,8 @@ public interface TaskContext { TaskName getTaskName(); + SamzaContainerContext getSamzaContainerContext(); + /** * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 9fc3b55..5416dd6 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -414,7 +414,7 @@ object SamzaContainer extends Logging { // Increment by 1 because partition starts from 0, but we need the absolute count, // this value is used for change log topic creation. val maxChangeLogStreamPartitions = containerModel.getTasks.values - .max(Ordering.by{task:TaskModel => task.getChangelogPartition.getPartitionId}) + .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId }) .getChangelogPartition.getPartitionId + 1 val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { @@ -494,6 +494,7 @@ object SamzaContainer extends Logging { metrics = taskInstanceMetrics, consumerMultiplexer = consumerMultiplexer, collector = collector, + containerContext = containerContext, offsetManager = offsetManager, storageManager = storageManager, reporters = reporters, http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index be0b55a..c5a5ea5 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -46,6 +46,7 @@ class TaskInstance( metrics: TaskInstanceMetrics, consumerMultiplexer: SystemConsumers, collector: TaskInstanceCollector, + containerContext: SamzaContainerContext, offsetManager: OffsetManager = new OffsetManager, storageManager: TaskStorageManager = null, reporters: Map[String, MetricsReporter] = Map(), @@ -65,6 +66,7 @@ class TaskInstance( null } def getTaskName = taskName + def getSamzaContainerContext = containerContext override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = { offsetManager.startingOffsets += (ssp -> offset) http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 81742bc..cab31ca 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -121,13 +121,16 @@ class TestSamzaContainer extends AssertionsForJUnit { Map[String, SystemProducer](), new SerdeManager) val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) val taskInstance: TaskInstance = new TaskInstance( task, taskName, config, new TaskInstanceMetrics, consumerMultiplexer, - collector) + collector, + containerContext + ) val runLoop = new RunLoop( taskInstances = Map(taskName -> taskInstance), consumerMultiplexer = consumerMultiplexer, http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 54b4df8..7caad28 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -69,6 +69,7 @@ class TestTaskInstance { val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) val taskName = new TaskName("taskName") val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) val taskInstance: TaskInstance = new TaskInstance( task, taskName, @@ -76,6 +77,7 @@ class TestTaskInstance { new TaskInstanceMetrics, consumerMultiplexer, collector, + containerContext, offsetManager) // Pretend we got a message with offset 2 and next offset 3. val coordinator = new ReadableCoordinator(taskName) @@ -159,6 +161,7 @@ class TestTaskInstance { val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) val taskName = new TaskName("taskName") val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) val registry = new MetricsRegistryMap val taskMetrics = new TaskInstanceMetrics(registry = registry) @@ -169,6 +172,7 @@ class TestTaskInstance { taskMetrics, consumerMultiplexer, collector, + containerContext, offsetManager, exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config)) @@ -211,6 +215,7 @@ class TestTaskInstance { val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) val taskName = new TaskName("taskName") val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) val registry = new MetricsRegistryMap val taskMetrics = new TaskInstanceMetrics(registry = registry) @@ -221,6 +226,7 @@ class TestTaskInstance { taskMetrics, consumerMultiplexer, collector, + containerContext, offsetManager, exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config)) @@ -261,23 +267,28 @@ class TestTaskInstance { override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {} } + val config = new MapConfig() val chooser = new RoundRobinChooser() val consumers = new SystemConsumers(chooser, consumers = Map.empty) val producers = new SystemProducers(Map.empty, new SerdeManager()) + val metrics = new TaskInstanceMetrics() + val taskName = new TaskName("Offset Reset Task 0") val collector = new TaskInstanceCollector(producers) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) val offsetManager = new OffsetManager() offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0") val taskInstance = new TaskInstance( - task = task, - taskName = new TaskName("Offset Reset Task 0"), - config = new MapConfig(), - metrics = new TaskInstanceMetrics(), - consumerMultiplexer = consumers, - collector = collector, - offsetManager = offsetManager, + task, + taskName, + config, + metrics, + consumers, + collector, + containerContext, + offsetManager, systemStreamPartitions = Set(partition0, partition1) ) taskInstance.initTask
