This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ddb3690486a631efa9db704531781745d02ee546
Author: narendly <naren...@gmail.com>
AuthorDate: Mon Feb 25 17:49:45 2019 -0800

    [HELIX-794] TASK: Fix double-booking of tasks upon Participant disconnect
    
    It's been observed in production use cases that when there are transient 
Participant connection issues, the Controller would fail to honor 
maxNumberOfTasksPerInstance limit. That is to say, if the user wants only 1 
task from a job (limit is set to 1), Helix must assign up to 1 task onto an 
instance. But upon short Participant disconnects, we saw 2 tasks in RUNNING at 
the same time.
    The cause for this is the incorrect calculation of jobConfigLimitation in 
AbstractTaskDispatcher. This fixes this by utilizing a Map (assignedPartitions) 
to calculate the correct number of tasks to assign.
    Changelist:
    1. Modify an internal data structure (assignedPartitions)
    2. Fix the logic that calculates the number of tasks to assign
---
 .../apache/helix/task/AbstractTaskDispatcher.java  |  38 +++--
 .../java/org/apache/helix/task/JobDispatcher.java  |   3 +-
 .../helix/integration/task/TestNoDoubleAssign.java | 171 +++++++++++++++++++++
 3 files changed, 195 insertions(+), 17 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 6fe5f7b..1bfab8b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -45,7 +45,7 @@ public abstract class AbstractTaskDispatcher {
       Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, 
Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext 
jobCtx, JobConfig jobCfg,
       ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
-      Set<Integer> assignedPartitions, Set<Integer> partitionsToDropFromIs,
+      Map<String, Set<Integer>> assignedPartitions, Set<Integer> 
partitionsToDropFromIs,
       Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState,
       Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache) {
 
@@ -58,6 +58,8 @@ public abstract class AbstractTaskDispatcher {
         continue;
       }
 
+      // If not an excluded instance, we must instantiate its entry in 
assignedPartitions
+      assignedPartitions.put(instance, new HashSet<Integer>());
       Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
       // Used to keep track of partitions that are in one of the final states: 
COMPLETED, TIMED_OUT,
       // TASK_ERROR, ERROR.
@@ -122,7 +124,7 @@ public abstract class AbstractTaskDispatcher {
           }
 
           paMap.put(pId, new PartitionAssignment(instance, 
requestedState.name()));
-          assignedPartitions.add(pId);
+          assignedPartitions.get(instance).add(pId);
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 String.format("Instance %s requested a state transition to %s 
for partition %s.",
@@ -146,7 +148,7 @@ public abstract class AbstractTaskDispatcher {
           }
 
           paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
+          assignedPartitions.get(instance).add(pId);
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
                 nextState, instance));
@@ -170,7 +172,7 @@ public abstract class AbstractTaskDispatcher {
             assignableInstanceManager.release(instance, taskConfig, quotaType);
           }
           paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, 
nextState.name()));
-          assignedPartitions.add(pId);
+          assignedPartitions.get(instance).add(pId);
 
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
@@ -252,7 +254,7 @@ public abstract class AbstractTaskDispatcher {
             // Job is in progress, implying that tasks are being re-tried, so 
set it to RUNNING
             paMap.put(pId,
                 new JobRebalancer.PartitionAssignment(instance, 
TaskPartitionState.RUNNING.name()));
-            assignedPartitions.add(pId);
+            assignedPartitions.get(instance).add(pId);
           }
         }
 
@@ -335,7 +337,7 @@ public abstract class AbstractTaskDispatcher {
   private void processTaskWithPendingMessage(ResourceAssignment 
prevAssignment, Integer pId,
       String pName, String instance, Message pendingMessage, TaskState 
jobState,
       TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap,
-      Set<Integer> assignedPartitions) {
+      Map<String, Set<Integer>> assignedPartitions) {
 
     // stateMap is a mapping of Instance -> TaskPartitionState (String)
     Map<String, String> stateMap = prevAssignment.getReplicaMap(new 
Partition(pName));
@@ -352,7 +354,7 @@ public abstract class AbstractTaskDispatcher {
         // While job is timing out, if the task is pending on INIT->RUNNING, 
set it back to INIT,
         // so that Helix will cancel the transition.
         paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.INIT.name()));
-        assignedPartitions.add(pId);
+        assignedPartitions.get(instance).add(pId);
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format(
               "Task partition %s has a pending state transition on instance %s 
INIT->RUNNING. Previous state %s"
@@ -363,7 +365,7 @@ public abstract class AbstractTaskDispatcher {
         // Otherwise, Just copy forward
         // the state assignment from the previous ideal state.
         paMap.put(pId, new PartitionAssignment(instance, prevState));
-        assignedPartitions.add(pId);
+        assignedPartitions.get(instance).add(pId);
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format(
               "Task partition %s has a pending state transition on instance 
%s. Using the previous ideal state which was %s.",
@@ -440,11 +442,11 @@ public abstract class AbstractTaskDispatcher {
   protected void handleAdditionalTaskAssignment(
       Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, 
Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext 
jobCtx, JobConfig jobCfg,
-      WorkflowConfig workflowConfig, WorkflowContext workflowCtx, 
WorkflowControllerDataProvider cache,
-      ResourceAssignment prevTaskToInstanceStateAssignment, Set<Integer> 
assignedPartitions,
-      Map<Integer, PartitionAssignment> paMap, Set<Integer> skippedPartitions,
-      TaskAssignmentCalculator taskAssignmentCal, Set<Integer> allPartitions, 
long currentTime,
-      Collection<String> liveInstances) {
+      WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
+      WorkflowControllerDataProvider cache, ResourceAssignment 
prevTaskToInstanceStateAssignment,
+      Map<String, Set<Integer>> assignedPartitions, Map<Integer, 
PartitionAssignment> paMap,
+      Set<Integer> skippedPartitions, TaskAssignmentCalculator 
taskAssignmentCal,
+      Set<Integer> allPartitions, long currentTime, Collection<String> 
liveInstances) {
 
     // See if there was LiveInstance change and cache LiveInstances from this 
iteration of pipeline
     boolean existsLiveInstanceOrCurrentStateChange =
@@ -453,7 +455,11 @@ public abstract class AbstractTaskDispatcher {
     // The excludeSet contains the set of task partitions that must be 
excluded from consideration
     // when making any new assignments.
     // This includes all completed, failed, delayed, and already assigned 
partitions.
-    Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+    Set<Integer> excludeSet = Sets.newTreeSet();
+    // Add all assigned partitions to excludeSet
+    for (Set<Integer> assignedSet : assignedPartitions.values()) {
+      excludeSet.addAll(assignedSet);
+    }
     addCompletedTasks(excludeSet, jobCtx, allPartitions);
     addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
     excludeSet.addAll(skippedPartitions);
@@ -540,8 +546,8 @@ public abstract class AbstractTaskDispatcher {
       }
       // 1. throttled by job configuration
       // Contains the set of task partitions currently assigned to the 
instance.
-      Set<Integer> pSet = entry.getValue();
-      int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - 
pSet.size();
+      int jobCfgLimitation =
+          jobCfg.getNumConcurrentTasksPerInstance() - 
assignedPartitions.get(instance).size();
       // 2. throttled by participant capacity
       int participantCapacity = 
cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask();
       if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 59a37ba..408c9f1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -176,7 +176,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       Set<Integer> partitionsToDropFromIs, WorkflowControllerDataProvider 
cache) {
 
     // Used to keep track of tasks that have already been assigned to 
instances.
-    Set<Integer> assignedPartitions = new HashSet<>();
+    // InstanceName -> Set of task partitions assigned to that instance in 
this iteration
+    Map<String, Set<Integer>> assignedPartitions = new HashMap<>();
 
     // Used to keep track of tasks that have failed, but whose failure is 
acceptable
     Set<Integer> skippedPartitions = new HashSet<>();
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java
new file mode 100644
index 0000000..8fa3f94
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java
@@ -0,0 +1,171 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestNoDoubleAssign extends TaskTestBase {
+  private static final int THREAD_COUNT = 20;
+  private static final long CONNECTION_DELAY = 100L;
+  private static final long POLL_DELAY = 100L;
+  private static final String TASK_DURATION = "200";
+  private static final Random RANDOM = new Random();
+
+  private ScheduledExecutorService _executorServicePoll;
+  private ScheduledExecutorService _executorServiceConnection;
+  private AtomicBoolean _existsDoubleAssign = new AtomicBoolean(false);
+  private Set<String> _jobNames = new HashSet<>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 0;
+    _numPartitions = 0;
+    _numReplicas = 0;
+    super.beforeClass();
+  }
+
+  /**
+   * Tests that no Participants have more tasks from the same job than is 
specified in the config.
+   * (MaxConcurrentTaskPerInstance, default value = 1)
+   * NOTE: this test is supposed to generate a lot of Participant-side ERROR 
message (ZkClient
+   * already closed!) because we are disconnecting them on purpose.
+   */
+  @Test
+  public void testNoDoubleAssign() throws InterruptedException {
+    // Some arbitrary workload that creates a reasonably large amount of tasks
+    int workload = 10;
+
+    // Create a workflow with jobs and tasks
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    for (int i = 0; i < workload; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      for (int j = 0; j < workload; j++) {
+        String taskID = "JOB_" + i + "_TASK_" + j;
+        TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+        taskConfigBuilder.setTaskId(taskID).setCommand(MockTask.TASK_COMMAND)
+            .addConfig(MockTask.JOB_DELAY, TASK_DURATION);
+        taskConfigs.add(taskConfigBuilder.build());
+      }
+      String jobName = "JOB_" + i;
+      _jobNames.add(workflowName + "_" + jobName); // Add the namespaced job 
name
+      JobConfig.Builder jobBuilder =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(10000)
+              .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+              .addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true)
+              .setFailureThreshold(100000)
+              .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, 
TASK_DURATION));
+      builder.addJob(jobName, jobBuilder);
+    }
+    // Start the workflow
+    _driver.start(builder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+    breakConnection();
+    pollForDoubleAssign();
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    Assert.assertFalse(_existsDoubleAssign.get());
+
+    // Shut down thread pools
+    _executorServicePoll.shutdown();
+    _executorServiceConnection.shutdown();
+    try {
+      if (!_executorServicePoll.awaitTermination(60, TimeUnit.SECONDS)) {
+        _executorServicePoll.shutdownNow();
+      }
+      if (!_executorServiceConnection.awaitTermination(60, TimeUnit.SECONDS)) {
+        _executorServiceConnection.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      _executorServicePoll.shutdownNow();
+      _executorServiceConnection.shutdownNow();
+    }
+  }
+
+  /**
+   * Fetch the JobContext for all jobs in ZK and check that no two tasks are 
running on the same
+   * Participant.
+   */
+  private void pollForDoubleAssign() {
+    _executorServicePoll = Executors.newScheduledThreadPool(THREAD_COUNT);
+    _executorServicePoll.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        if (!_existsDoubleAssign.get()) {
+          // Get JobContexts and test that they are assigned to disparate 
Participants
+          for (String job : _jobNames) {
+            JobContext jobContext = _driver.getJobContext(job);
+            if (jobContext == null) {
+              continue;
+            }
+            Set<String> instanceCache = new HashSet<>();
+            for (int partition : jobContext.getPartitionSet()) {
+              if (jobContext.getPartitionState(partition) == 
TaskPartitionState.RUNNING) {
+                if 
(instanceCache.contains(jobContext.getAssignedParticipant(partition))) {
+                  // Two tasks running on the same instance at the same time
+                  _existsDoubleAssign.set(true);
+                  return;
+                }
+                
instanceCache.add(jobContext.getAssignedParticipant(partition));
+              }
+            }
+          }
+        }
+      }
+    }, 0L, POLL_DELAY, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Randomly causes Participants to lost connection temporarily.
+   */
+  private void breakConnection() {
+    _executorServiceConnection = 
Executors.newScheduledThreadPool(THREAD_COUNT);
+    _executorServiceConnection.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        // Randomly pick a Participant and cause a transient connection issue
+        int participantIndex = RANDOM.nextInt(_numNodes);
+        _participants[participantIndex].syncStop();
+        startParticipant(participantIndex);
+      }
+    }, 0L, CONNECTION_DELAY, TimeUnit.MILLISECONDS);
+  }
+}

Reply via email to