This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c61554  ExpiredJob Workaround for Selective Update Race Conditions 
(#1470)
2c61554 is described below

commit 2c615543bd86e50d76dd4d7006d1c90e130e9755
Author: Neal Sun <[email protected]>
AuthorDate: Wed Oct 21 15:31:01 2020 -0700

    ExpiredJob Workaround for Selective Update Race Conditions (#1470)
    
    This PR implements a workaround for determining expired jobs
    that avoids selective update race condition: if JobConfig doesn't
    exist in the cache, check ZK directly.
---
 .../apache/helix/controller/stages/TaskGarbageCollectionStage.java | 4 ++--
 helix-core/src/main/java/org/apache/helix/task/TaskUtil.java       | 7 ++++++-
 helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java   | 4 ++--
 3 files changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 82def48..0b0d7d4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -81,8 +81,8 @@ public class TaskGarbageCollectionStage extends 
AbstractAsyncBaseStage {
         if (nextPurgeTime <= currentTime) {
           nextPurgeTime = currentTime + purgeInterval;
           // Find jobs that are ready to be purged
-          Set<String> expiredJobs =
-              TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, 
workflowContext);
+          Set<String> expiredJobs = TaskUtil
+              .getExpiredJobsFromCache(dataProvider, workflowConfig, 
workflowContext, manager);
           if (!expiredJobs.isEmpty()) {
             expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
           }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index fa89cde..890b151 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -789,7 +789,7 @@ public class TaskUtil {
    */
   public static Set<String> getExpiredJobsFromCache(
       WorkflowControllerDataProvider workflowControllerDataProvider, 
WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext) {
+      WorkflowContext workflowContext, HelixManager manager) {
     Set<String> expiredJobs = new HashSet<>();
     Map<String, TaskState> jobStates = workflowContext.getJobStates();
     for (String job : workflowConfig.getJobDag().getAllNodes()) {
@@ -797,6 +797,11 @@ public class TaskUtil {
         continue;
       }
       JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+      // TODO: Temporary solution for cache selective update race conditions
+      if (jobConfig == null) {
+        jobConfig = TaskUtil.getJobConfig(manager, job);
+      }
+
       JobContext jobContext = 
workflowControllerDataProvider.getJobContext(job);
       TaskState jobState = jobStates.get(job);
       if (isJobExpired(job, jobConfig, jobContext, jobState)) {
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
index a85ba67..c930954 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -98,7 +98,7 @@ public class TestTaskUtil extends TaskTestBase {
     expectedJobs.add(workflowName + "_Job_3");
     Assert.assertEquals(TaskUtil
         .getExpiredJobsFromCache(workflowControllerDataProvider, 
jobQueue.getWorkflowConfig(),
-            workflowContext), expectedJobs);
+            workflowContext, _manager), expectedJobs);
   }
 
   @Test
@@ -188,7 +188,7 @@ public class TestTaskUtil extends TaskTestBase {
     expectedJobs.add(workflowName + "_Job_8");
     Assert.assertEquals(TaskUtil
         .getExpiredJobsFromCache(workflowControllerDataProvider, 
workflow.getWorkflowConfig(),
-            workflowContext), expectedJobs);
+            workflowContext, _manager), expectedJobs);
   }
 
   @Test

Reply via email to