This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 733be5b92c0bbff5d3c82225b8d7a509ea2e081f Author: Ali Reza Zamani Zadeh Najari <[email protected]> AuthorDate: Fri Jan 29 11:25:21 2021 -0800 Drop current state of the task on disable instances In this commit, if an instance is not enabled, the controller drops the current state of the task on the disbabled node first before assigning the task to new instance. Otherwise, once the instance becomse enable controller will see two running tasks. --- .../apache/helix/task/AbstractTaskDispatcher.java | 39 +++++++---- .../integration/task/TestTaskCurrentStateDrop.java | 75 ++++++++++++++++++++++ 2 files changed, 102 insertions(+), 12 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index ef0d6b6..02933f2 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -562,8 +562,9 @@ public abstract class AbstractTaskDispatcher { // The following is filtering of tasks before passing them to the assigner // Only feed in tasks that need to be assigned (have state equal to null, STOPPED, TIMED_OUT, - // TASK_ERROR, or DROPPED) or their assigned participant is not live anymore - Set<Integer> filteredTaskPartitionNumbers = filterTasks(allPartitions, jobCtx, liveInstances); + // TASK_ERROR, or DROPPED) or their assigned participant is disabled or not live anymore + Set<Integer> filteredTaskPartitionNumbers = filterTasks(jobResource, allPartitions, jobCtx, + liveInstances, cache.getDisabledInstances(), currStateOutput, paMap); // Remove all excludeSet tasks to be safer because some STOPPED tasks have been already // re-started (excludeSet includes already-assigned partitions). Also tasks with their retry // limit exceed (addGiveupPartitions) will be removed as well @@ -795,13 +796,17 @@ public abstract class AbstractTaskDispatcher { * only the * tasks whose contexts are in these states are eligible to be assigned or re-tried. * Also, for those tasks in non-terminal states whose previously assigned instances are no longer - * LiveInstances are re-added so that they could be re-assigned. - * @param allPartitions - * @param jobContext - * @return a filter Iterable of task partition numbers + * LiveInstances are re-added so that they could be re-assigned. Since in Task Pipeline, + * LiveInstance list contains instances that are live and enable, if instance is not among live + * instance, it is either not live or not enabled. If instance is not enabled, controller should + * first drop the task on the participant. After the task is dropped, then the task can be + * filtered for new assignment. Otherwise, once the participant is re-enabled, controller will + * two tasks in running state on two different participant and that cause quota and scheduling + * issues. */ - private Set<Integer> filterTasks(Iterable<Integer> allPartitions, JobContext jobContext, - Collection<String> liveInstances) { + private Set<Integer> filterTasks(String jobResource, Iterable<Integer> allPartitions, + JobContext jobContext, Collection<String> liveInstances, Set<String> disableInstances, + CurrentStateOutput currStateOutput, Map<Integer, PartitionAssignment> paMap) { Set<Integer> filteredTasks = new HashSet<>(); for (int partitionNumber : allPartitions) { TaskPartitionState state = jobContext.getPartitionState(partitionNumber); @@ -811,13 +816,23 @@ public abstract class AbstractTaskDispatcher { || state == TaskPartitionState.DROPPED) { filteredTasks.add(partitionNumber); } - // Allow tasks whose assigned instances are no longer live for rescheduling + // Allow tasks whose assigned instances are no longer live or enable for rescheduling if (isTaskNotInTerminalState(state)) { String assignedParticipant = jobContext.getAssignedParticipant(partitionNumber); + final String pName = pName(jobResource, partitionNumber); if (assignedParticipant != null && !liveInstances.contains(assignedParticipant)) { - // The assigned instance is no longer live, so mark it as DROPPED in the context - jobContext.setPartitionState(partitionNumber, TaskPartitionState.DROPPED); - filteredTasks.add(partitionNumber); + // The assigned instance is no longer in the liveInstance list. It is either not live or + // disabled. If instance is disabled and current state still exist on the instance, + // then controller needs to drop the current state, otherwise, the task can be marked as + // dropped and be reassigned to other instances + if (disableInstances.contains(assignedParticipant) && currStateOutput + .getCurrentState(jobResource, new Partition(pName), assignedParticipant) != null) { + paMap.put(partitionNumber, + new PartitionAssignment(assignedParticipant, TaskPartitionState.DROPPED.name())); + } else { + jobContext.setPartitionState(partitionNumber, TaskPartitionState.DROPPED); + filteredTasks.add(partitionNumber); + } } } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java index 8ca89e1..7f4dbc6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java @@ -23,12 +23,14 @@ import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.ZkTestHelper; +import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.model.MasterSlaveSMD; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.zookeeper.data.Stat; @@ -126,5 +128,78 @@ public class TestTaskCurrentStateDrop extends TaskTestBase { return (taskRecord == null && dataBase != null); }, TestHelper.WAIT_DURATION); Assert.assertTrue(isCurrentStateExpected); + _driver.stop(jobQueueName); + } + + @Test (dependsOnMethods = "testCurrentStateDropAfterReconnecting") + public void testDropCurrentStateDisableInstance() throws Exception { + // Start the Controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + String workflowName1 = TestHelper.getTestMethodName() + "_1"; + String jobName = "JOB0"; + JobConfig.Builder jobBuilder1 = + new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(1) + .setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); + + + Workflow.Builder workflowBuilder1 = + new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1); + + _driver.start(workflowBuilder1.build()); + String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName1, jobName); + // Make sure current state and context are going to expected state of RUNNING + String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0); + ZkClient clientP0 = (ZkClient) _participants[0].getZkClient(); + String sessionIdP0 = ZkTestHelper.getSessionId(clientP0); + String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString(); + + _driver.pollForJobState(workflowName1, namespacedJobName, TaskState.IN_PROGRESS); + + boolean isCurrentStateCreated = TestHelper.verify(() -> { + ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor() + .get(currentStatePathP0, new Stat(), AccessOption.PERSISTENT); + return record != null; + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isCurrentStateCreated); + + Assert.assertTrue(TestHelper + .verify(() -> (TaskPartitionState.RUNNING + .equals(_driver.getJobContext(namespacedJobName) + .getPartitionState(0))), + TestHelper.WAIT_DURATION)); + + + // Disable the instance and make sure the task current state is dropped + String disabledInstance = _participants[0].getInstanceName(); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, false); + + + boolean isCurrentStateDeleted = TestHelper.verify(() -> { + ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor() + .get(currentStatePathP0, new Stat(), AccessOption.PERSISTENT); + return record == null; + }, TestHelper.WAIT_DURATION); + + Assert.assertTrue(TestHelper + .verify(() -> (TaskPartitionState.DROPPED + .equals(_driver.getJobContext(namespacedJobName) + .getPartitionState(0))), + TestHelper.WAIT_DURATION)); + Assert.assertTrue(isCurrentStateDeleted); + + // enable participant again and make sure task will be retried and number of attempts is increased + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, true); + + Assert.assertTrue(TestHelper + .verify(() -> (TaskPartitionState.RUNNING + .equals(_driver.getJobContext(namespacedJobName) + .getPartitionState(0)) && _driver.getJobContext(namespacedJobName) + .getPartitionNumAttempts(0) == 2), + TestHelper.WAIT_DURATION)); } }
