Repository: samza Updated Branches: refs/heads/master 4323003dc -> e801ab259
SAMZA-662: fixed auto-created changelog stream does not have enough partitions when container number > 1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e801ab25 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e801ab25 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e801ab25 Branch: refs/heads/master Commit: e801ab2598b98ed6fdd3826760a059026c0dfe55 Parents: 4323003 Author: Guozhang Wang <[email protected]> Authored: Wed May 13 11:06:18 2015 -0700 Committer: Yan Fang <[email protected]> Committed: Wed May 13 11:06:18 2015 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 17 +++++++++-------- .../apache/samza/job/local/ThreadJobFactory.scala | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e801ab25/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index e8e830e..bdd491b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -92,7 +92,7 @@ object SamzaContainer extends Logging { try { jmxServer = newJmxServer() - SamzaContainer(containerModel, config).run + SamzaContainer(containerModel, jobModel).run } finally { if (jmxServer != null) { jmxServer.stop @@ -133,7 +133,8 @@ object SamzaContainer extends Logging { serde } - def apply(containerModel: ContainerModel, config: Config) = { + def apply(containerModel: ContainerModel, jobModel: JobModel) = { + val config = jobModel.getConfig val containerId = containerModel.getContainerId val containerName = "samza-container-%s" format containerId val containerPID = Util.getContainerPID @@ -408,12 +409,12 @@ object SamzaContainer extends Logging { val containerContext = new SamzaContainerContext(containerId, config, taskNames) - // compute the number of partitions necessary for the change log stream creation. - // Increment by 1 because partition starts from 0, but we need the absolute count, - // this value is used for change log topic creation. - val maxChangeLogStreamPartitions = containerModel.getTasks.values - .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId }) - .getChangelogPartition.getPartitionId + 1 + // Compute the number of change log stream partitions as the maximum partition-id + // of all total number of tasks of the job; Increment by 1 because partition ids + // start from 0 while we need the absolute count. + val maxChangeLogStreamPartitions = jobModel.getContainers.values.flatMap { container: ContainerModel => + container.getTasks.values.map(_.getChangelogPartition.getPartitionId) + }.max + 1 val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { debug("Setting up task instance: %s" format taskModel) http://git-wip-us.apache.org/repos/asf/samza/blob/e801ab25/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 60ee36f..3f2f70e 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -48,7 +48,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { try { coordinator.start - new ThreadJob(SamzaContainer(containerModel, config)) + new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel)) } finally { coordinator.stop }
