This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 733be5b92c0bbff5d3c82225b8d7a509ea2e081f
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Fri Jan 29 11:25:21 2021 -0800

    Drop current state of the task on disable instances
    
    In this commit, if an instance is not enabled, the controller drops the
    current state of the task on the disbabled node first before assigning
    the task to new instance. Otherwise, once the instance becomse enable
    controller will see two running tasks.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 39 +++++++----
 .../integration/task/TestTaskCurrentStateDrop.java | 75 ++++++++++++++++++++++
 2 files changed, 102 insertions(+), 12 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index ef0d6b6..02933f2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -562,8 +562,9 @@ public abstract class AbstractTaskDispatcher {
 
     // The following is filtering of tasks before passing them to the assigner
     // Only feed in tasks that need to be assigned (have state equal to null, 
STOPPED, TIMED_OUT,
-    // TASK_ERROR, or DROPPED) or their assigned participant is not live 
anymore
-    Set<Integer> filteredTaskPartitionNumbers = filterTasks(allPartitions, 
jobCtx, liveInstances);
+    // TASK_ERROR, or DROPPED) or their assigned participant is disabled or 
not live anymore
+    Set<Integer> filteredTaskPartitionNumbers = filterTasks(jobResource, 
allPartitions, jobCtx,
+        liveInstances, cache.getDisabledInstances(), currStateOutput, paMap);
     // Remove all excludeSet tasks to be safer because some STOPPED tasks have 
been already
     // re-started (excludeSet includes already-assigned partitions). Also 
tasks with their retry
     // limit exceed (addGiveupPartitions) will be removed as well
@@ -795,13 +796,17 @@ public abstract class AbstractTaskDispatcher {
    * only the
    * tasks whose contexts are in these states are eligible to be assigned or 
re-tried.
    * Also, for those tasks in non-terminal states whose previously assigned 
instances are no longer
-   * LiveInstances are re-added so that they could be re-assigned.
-   * @param allPartitions
-   * @param jobContext
-   * @return a filter Iterable of task partition numbers
+   * LiveInstances are re-added so that they could be re-assigned. Since in 
Task Pipeline,
+   * LiveInstance list contains instances that are live and enable, if 
instance is not among live
+   * instance, it is either not live or not enabled. If instance is not 
enabled, controller should
+   * first drop the task on the participant. After the task is dropped, then 
the task can be
+   * filtered for new assignment. Otherwise, once the participant is 
re-enabled, controller will
+   * two tasks in running state on two different participant and that cause 
quota and scheduling
+   * issues.
    */
-  private Set<Integer> filterTasks(Iterable<Integer> allPartitions, JobContext 
jobContext,
-      Collection<String> liveInstances) {
+  private Set<Integer> filterTasks(String jobResource, Iterable<Integer> 
allPartitions,
+      JobContext jobContext, Collection<String> liveInstances, Set<String> 
disableInstances,
+      CurrentStateOutput currStateOutput, Map<Integer, PartitionAssignment> 
paMap) {
     Set<Integer> filteredTasks = new HashSet<>();
     for (int partitionNumber : allPartitions) {
       TaskPartitionState state = jobContext.getPartitionState(partitionNumber);
@@ -811,13 +816,23 @@ public abstract class AbstractTaskDispatcher {
           || state == TaskPartitionState.DROPPED) {
         filteredTasks.add(partitionNumber);
       }
-      // Allow tasks whose assigned instances are no longer live for 
rescheduling
+      // Allow tasks whose assigned instances are no longer live or enable for 
rescheduling
       if (isTaskNotInTerminalState(state)) {
         String assignedParticipant = 
jobContext.getAssignedParticipant(partitionNumber);
+        final String pName = pName(jobResource, partitionNumber);
         if (assignedParticipant != null && 
!liveInstances.contains(assignedParticipant)) {
-          // The assigned instance is no longer live, so mark it as DROPPED in 
the context
-          jobContext.setPartitionState(partitionNumber, 
TaskPartitionState.DROPPED);
-          filteredTasks.add(partitionNumber);
+          // The assigned instance is no longer in the liveInstance list. It 
is either not live or
+          // disabled. If instance is disabled and current state still exist 
on the instance,
+          // then controller needs to drop the current state, otherwise, the 
task can be marked as
+          // dropped and be reassigned to other instances
+          if (disableInstances.contains(assignedParticipant) && currStateOutput
+              .getCurrentState(jobResource, new Partition(pName), 
assignedParticipant) != null) {
+            paMap.put(partitionNumber,
+                new PartitionAssignment(assignedParticipant, 
TaskPartitionState.DROPPED.name()));
+          } else {
+            jobContext.setPartitionState(partitionNumber, 
TaskPartitionState.DROPPED);
+            filteredTasks.add(partitionNumber);
+          }
         }
       }
     }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
index 8ca89e1..7f4dbc6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
@@ -23,12 +23,14 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.zookeeper.data.Stat;
@@ -126,5 +128,78 @@ public class TestTaskCurrentStateDrop extends TaskTestBase 
{
       return (taskRecord == null && dataBase != null);
     }, TestHelper.WAIT_DURATION);
     Assert.assertTrue(isCurrentStateExpected);
+    _driver.stop(jobQueueName);
+  }
+
+  @Test (dependsOnMethods = "testCurrentStateDropAfterReconnecting")
+  public void testDropCurrentStateDisableInstance() throws Exception {
+    // Start the Controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    String workflowName1 = TestHelper.getTestMethodName() + "_1";
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder1 =
+        new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(1)
+            
.setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, 
"99999999"));
+
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+
+    _driver.start(workflowBuilder1.build());
+    String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName1, 
jobName);
+    // Make sure current state and context are going to expected state of 
RUNNING
+    String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+    String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+    String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP0, sessionIdP0, 
namespacedJobName).toString();
+
+    _driver.pollForJobState(workflowName1, namespacedJobName, 
TaskState.IN_PROGRESS);
+
+    boolean isCurrentStateCreated = TestHelper.verify(() -> {
+      ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get(currentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+      return record != null;
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isCurrentStateCreated);
+
+    Assert.assertTrue(TestHelper
+        .verify(() -> (TaskPartitionState.RUNNING
+                .equals(_driver.getJobContext(namespacedJobName)
+                    .getPartitionState(0))),
+            TestHelper.WAIT_DURATION));
+
+
+    // Disable the instance and make sure the task current state is dropped
+    String disabledInstance = _participants[0].getInstanceName();
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
disabledInstance, false);
+
+
+    boolean isCurrentStateDeleted = TestHelper.verify(() -> {
+      ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get(currentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+      return record == null;
+    }, TestHelper.WAIT_DURATION);
+
+    Assert.assertTrue(TestHelper
+        .verify(() -> (TaskPartitionState.DROPPED
+                .equals(_driver.getJobContext(namespacedJobName)
+                    .getPartitionState(0))),
+            TestHelper.WAIT_DURATION));
+    Assert.assertTrue(isCurrentStateDeleted);
+
+    // enable participant again and make sure task will be retried and number 
of attempts is increased
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
disabledInstance, true);
+
+    Assert.assertTrue(TestHelper
+        .verify(() -> (TaskPartitionState.RUNNING
+                .equals(_driver.getJobContext(namespacedJobName)
+                    .getPartitionState(0)) && 
_driver.getJobContext(namespacedJobName)
+                .getPartitionNumAttempts(0) == 2),
+            TestHelper.WAIT_DURATION));
   }
 }

Reply via email to