This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 36bdc07  SAMZA-2169: Preventing task-shuffle after task mode addition
36bdc07 is described below

commit 36bdc0759d4333bbda36ec349af712a6c766edfc
Author: Ray Matharu <[email protected]>
AuthorDate: Fri Apr 19 10:16:10 2019 -0700

    SAMZA-2169: Preventing task-shuffle after task mode addition
    
    The issue is described in detail 
https://issues.apache.org/jira/browse/SAMZA-2169
    
    The fix is to treat a task with no-mode as an active task and still read 
the remaining task-container mapping from existing coord-stream.
    
    Author: Ray Matharu <[email protected]>
    
    Reviewers: Prateek Maheshwari <[email protected]>
    
    Closes #1004 from rmatharu/bugfix-taskmode
---
 .../scala/org/apache/samza/coordinator/JobModelManager.scala | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 4ada2f5..135f4c2 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -122,9 +122,9 @@ object JobModelManager extends Logging {
     val processorLocality: util.Map[String, LocationId] = 
getProcessorLocality(config, localityManager)
     val taskModes: util.Map[TaskName, TaskMode] = 
taskAssignmentManager.readTaskModes()
 
-    // We read the taskAssignment only for ActiveTasks
+    // We read the taskAssignment only for ActiveTasks, i.e., tasks that have 
no task-mode or have an active task mode
     val taskAssignment: util.Map[String, String] = 
taskAssignmentManager.readTaskAssignment().
-      filterKeys(taskName => taskModes.get(new 
TaskName(taskName)).eq(TaskMode.Active))
+      filterKeys(taskName => !taskModes.containsKey(new TaskName(taskName)) || 
taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
 
 
     val taskNameToProcessorId: util.Map[TaskName, String] = new 
util.HashMap[TaskName, String]()
@@ -149,11 +149,9 @@ object JobModelManager extends Logging {
       for (task <- taskNames) {
         val taskName: TaskName = new TaskName(task)
 
-        // We read the partition assignments only for active-tasks
-        if (taskModes.get(taskName).eq(TaskMode.Active)) {
-          if (!taskPartitionAssignments.containsKey(taskName)) {
-            taskPartitionAssignments.put(taskName, new 
util.ArrayList[SystemStreamPartition]())
-          }
+        // We read the partition assignments only for active-tasks, i.e., 
tasks that have no task-mode or have an active task mode
+        if (!taskModes.containsKey(taskName) || 
taskModes.get(taskName).eq(TaskMode.Active)) {
+          taskPartitionAssignments.putIfAbsent(taskName, new 
util.ArrayList[SystemStreamPartition]())
           taskPartitionAssignments.get(taskName).add(systemStreamPartition)
         }
       }

Reply via email to