This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new d0422b7  TASK: Fix incorrect counting of numAttempts for tasks (#432)
d0422b7 is described below

commit d0422b73205e12de4b5bb070c24905a53e3ed6af
Author: Hunter Lee <[email protected]>
AuthorDate: Mon Aug 26 13:48:21 2019 -0700

    TASK: Fix incorrect counting of numAttempts for tasks (#432)
    
    TASK: Fix incorrect counting of numAttempts for tasks
    
    It was discovered that sometimes the tasks' NUM_ATTEMPTS field in 
JobContext was getting incremented even without the tasks being retried. This 
was because the numAttempts field was getting incremented in other (incorrect) 
places than at scheduling time. The logic for incrementing the number of 
attempts has been moved to the schedule logic in this diff.
    Changelist:
    1. Modify tests so that they test for numAttempts more tightly
    2. Fix the incrementation logic
    3. Add a new integration test: TestTaskNumAttempts
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 20 +++---
 .../java/org/apache/helix/task/JobDispatcher.java  |  2 +-
 .../integration/task/TestTaskConditionalRetry.java |  4 +-
 .../integration/task/TestTaskNumAttempts.java      | 79 ++++++++++++++++++++++
 .../task/TestTaskRebalancerRetryLimit.java         |  5 +-
 .../helix/integration/task/TestTaskRetryDelay.java |  4 +-
 6 files changed, 93 insertions(+), 21 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 698730e..8c1e651 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.Callable;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.common.caches.TaskDataCache;
@@ -122,7 +121,9 @@ public abstract class AbstractTaskDispatcher {
         if (currState == TaskPartitionState.ERROR || currState == 
TaskPartitionState.TASK_ERROR
             || currState == TaskPartitionState.TIMED_OUT
             || currState == TaskPartitionState.TASK_ABORTED) {
-          markPartitionError(jobCtx, pId, currState, true);
+          // Do not increment the task attempt count here - it will be 
incremented at scheduling
+          // time
+          markPartitionError(jobCtx, pId, currState);
         }
 
         // Check for pending state transitions on this (partition, instance). 
If there is a pending
@@ -431,22 +432,16 @@ public abstract class AbstractTaskDispatcher {
   protected static void markPartitionCompleted(JobContext ctx, int pId) {
     ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
   }
 
-  protected static void markPartitionError(JobContext ctx, int pId, 
TaskPartitionState state,
-      boolean incrementAttempts) {
+  protected static void markPartitionError(JobContext ctx, int pId, 
TaskPartitionState state) {
     ctx.setPartitionState(pId, state);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    if (incrementAttempts) {
-      ctx.incrementNumAttempts(pId);
-    }
   }
 
-  protected static void markAllPartitionsError(JobContext ctx, 
TaskPartitionState state,
-      boolean incrementAttempts) {
+  protected static void markAllPartitionsError(JobContext ctx) {
     for (int pId : ctx.getPartitionSet()) {
-      markPartitionError(ctx, pId, state, incrementAttempts);
+      markPartitionError(ctx, pId, TaskPartitionState.ERROR);
     }
   }
 
@@ -622,6 +617,7 @@ public abstract class AbstractTaskDispatcher {
         List<Integer> nextPartitions = 
getNextPartitions(tgtPartitionAssignments.get(instance),
             excludeSet, throttledSet, numToAssign);
         for (Integer pId : nextPartitions) {
+          // The following is the actual scheduling of the tasks
           String pName = pName(jobResource, pId);
           paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.RUNNING.name()));
           excludeSet.add(pId);
@@ -635,6 +631,8 @@ public abstract class AbstractTaskDispatcher {
             reportSubmissionToScheduleDelay(cache, _clusterStatusMonitor, 
workflowConfig, jobCfg,
                 currentTimestamp);
           }
+          // Increment the task attempt count at schedule time
+          jobCtx.incrementNumAttempts(pId);
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
                 TaskPartitionState.RUNNING, instance));
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index c21330b..6f011f5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -228,7 +228,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       LOG.info(failureMsg);
       jobCtx.setInfo(failureMsg);
       failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap(), cache);
-      markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
+      markAllPartitionsError(jobCtx);
       return new ResourceAssignment(jobResource);
     }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
index 2dafcf7..0178fff 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
@@ -93,9 +93,7 @@ public class TestTaskConditionalRetry extends TaskTestBase {
         Assert.assertEquals(retriedCount, 1);
       } else if (taskId.equals("task_" + failedTask) || taskId.equals("task_" 
+ exceptionTask)) {
         Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
-        // The following retry count seems to be a race condition specific to 
tests
-        // TODO: fix so that the second condition could be removed
-        Assert.assertTrue(retriedCount == taskRetryCount || retriedCount == 
taskRetryCount + 1);
+        Assert.assertEquals(taskRetryCount, retriedCount);
       } else {
         Assert.assertEquals(state, TaskPartitionState.COMPLETED);
         Assert.assertEquals(retriedCount, 1);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskNumAttempts.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskNumAttempts.java
new file mode 100644
index 0000000..2318b99
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskNumAttempts.java
@@ -0,0 +1,79 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * TestTaskNumAttempts tests that number of attempts for tasks are incremented 
correctly respecting
+ * the retry delay.
+ * NOTE: This test relies heavily upon timing.
+ */
+public class TestTaskNumAttempts extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testTaskNumAttemptsWithDelay() throws Exception {
+    int maxAttempts = Integer.MAX_VALUE; // Allow the count to increase 
infinitely
+    // Use a delay that's long enough for multiple rounds of pipeline
+    long retryDelay = 4000L;
+
+    String workflowName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+        .setMaxAttemptsPerTask(maxAttempts).setCommand(MockTask.TASK_COMMAND)
+        .setWorkflow(workflowName).setFailureThreshold(Integer.MAX_VALUE)
+        .setTaskRetryDelay(retryDelay).setJobCommandConfigMap(
+            ImmutableMap.of(MockTask.FAILURE_COUNT_BEFORE_SUCCESS, 
String.valueOf(maxAttempts)));
+    Workflow flow =
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(workflowName, 
jobBuilder).build();
+    _driver.start(flow);
+
+    // Wait until the job is running
+    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+    JobContext jobContext =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
workflowName));
+    int expectedAttempts = jobContext.getPartitionNumAttempts(0);
+
+    // Check 3 times to see if maxAttempts match up
+    for (int i = 0; i < 3; i++) {
+      // Add a small delay to make sure the check falls in the middle of the 
scheduling timeline
+      Thread.sleep(retryDelay + 1000L);
+      JobContext ctx =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
workflowName));
+      expectedAttempts++;
+      Assert.assertEquals(ctx.getPartitionNumAttempts(0), expectedAttempts);
+    }
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index d8632f9..7a0069a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -58,10 +58,7 @@ public class TestTaskRebalancerRetryLimit extends 
TaskTestBase {
       TaskPartitionState state = ctx.getPartitionState(i);
       if (state != null) {
         Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
-        // The following retry count seems to be a race condition specific to 
tests
-        // TODO: fix so that the second condition could be removed ( == 3 )
-        Assert
-            .assertTrue(ctx.getPartitionNumAttempts(i) == 2 || 
ctx.getPartitionNumAttempts(i) == 3);
+        Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2);
       }
     }
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
index 2e94f81..015bad2 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
@@ -54,7 +54,7 @@ public class TestTaskRetryDelay extends TaskTestBase {
     long startTime = _driver.getWorkflowContext(jobResource).getStartTime();
     long finishedTime = 
_driver.getWorkflowContext(jobResource).getFinishTime();
 
-    // It should finished at least 4 secs
+    // It should finish and take more than 2 secs with the retry delay built in
     Assert.assertTrue(finishedTime - startTime >= 2000L);
   }
 
@@ -76,7 +76,7 @@ public class TestTaskRetryDelay extends TaskTestBase {
     long startTime = _driver.getWorkflowContext(jobResource).getStartTime();
     long finishedTime = 
_driver.getWorkflowContext(jobResource).getFinishTime();
 
-    // It should finished at less than 2 sec
+    // It should have finished within less than 2 sec
     Assert.assertTrue(finishedTime - startTime <= 2000L);
   }
 }

Reply via email to