This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 36bdc07 SAMZA-2169: Preventing task-shuffle after task mode addition
36bdc07 is described below
commit 36bdc0759d4333bbda36ec349af712a6c766edfc
Author: Ray Matharu <[email protected]>
AuthorDate: Fri Apr 19 10:16:10 2019 -0700
SAMZA-2169: Preventing task-shuffle after task mode addition
The issue is described in detail
https://issues.apache.org/jira/browse/SAMZA-2169
The fix is to treat a task with no-mode as an active task and still read
the remaining task-container mapping from existing coord-stream.
Author: Ray Matharu <[email protected]>
Reviewers: Prateek Maheshwari <[email protected]>
Closes #1004 from rmatharu/bugfix-taskmode
---
.../scala/org/apache/samza/coordinator/JobModelManager.scala | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 4ada2f5..135f4c2 100644
---
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -122,9 +122,9 @@ object JobModelManager extends Logging {
val processorLocality: util.Map[String, LocationId] =
getProcessorLocality(config, localityManager)
val taskModes: util.Map[TaskName, TaskMode] =
taskAssignmentManager.readTaskModes()
- // We read the taskAssignment only for ActiveTasks
+ // We read the taskAssignment only for ActiveTasks, i.e., tasks that have
no task-mode or have an active task mode
val taskAssignment: util.Map[String, String] =
taskAssignmentManager.readTaskAssignment().
- filterKeys(taskName => taskModes.get(new
TaskName(taskName)).eq(TaskMode.Active))
+ filterKeys(taskName => !taskModes.containsKey(new TaskName(taskName)) ||
taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
val taskNameToProcessorId: util.Map[TaskName, String] = new
util.HashMap[TaskName, String]()
@@ -149,11 +149,9 @@ object JobModelManager extends Logging {
for (task <- taskNames) {
val taskName: TaskName = new TaskName(task)
- // We read the partition assignments only for active-tasks
- if (taskModes.get(taskName).eq(TaskMode.Active)) {
- if (!taskPartitionAssignments.containsKey(taskName)) {
- taskPartitionAssignments.put(taskName, new
util.ArrayList[SystemStreamPartition]())
- }
+ // We read the partition assignments only for active-tasks, i.e.,
tasks that have no task-mode or have an active task mode
+ if (!taskModes.containsKey(taskName) ||
taskModes.get(taskName).eq(TaskMode.Active)) {
+ taskPartitionAssignments.putIfAbsent(taskName, new
util.ArrayList[SystemStreamPartition]())
taskPartitionAssignments.get(taskName).add(systemStreamPartition)
}
}