This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6b4ecc79cd0240071ce94d006c454be13d8b138e
Author: Hunter Lee <[email protected]>
AuthorDate: Mon Apr 8 21:44:40 2019 -0700

    TASK: Add deleteJob namespaced job name support
    
    Current deletion of jobs from JobQueues only support denamespaced job 
names. This makes it impossible for users to list all jobs and delete them 
because they cannot recover denamespaced names sometimes.
    Changelist:
    1. Add support for namespaced job names for deletion
    
    RB=1624395
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <[email protected]>
---
 .../java/org/apache/helix/task/TaskDriver.java     | 57 ++++++++++++----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index baa5467..5f4ac14 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -244,32 +244,40 @@ public class TaskDriver {
   /**
    * Delete a job from an existing named queue,
    * the queue has to be stopped prior to this call
-   *
    * @param queue queue name
-   * @param job  job name
+   * @param job job name, denamespaced
    */
   public void deleteJob(final String queue, final String job) {
-    deleteJob(queue, job, false);
+    deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), 
false);
   }
 
   /**
    * Delete a job from an existing named queue,
    * the queue has to be stopped prior to this call
-   *
    * @param queue queue name
-   * @param job  job name
-   * @param forceDelete  CAUTION: if set true, all job's related zk nodes will
-   *                     be clean up from zookeeper even if its workflow 
information can not be found.
+   * @param job job name, denamespaced
+   * @param forceDelete
    */
   public void deleteJob(final String queue, final String job, boolean 
forceDelete) {
+    deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), 
forceDelete);
+  }
+
+  /**
+   * Delete a job from an existing named queue,
+   * the queue has to be stopped prior to this call
+   * @param queue queue name
+   * @param job job name: namespaced job name
+   * @param forceDelete CAUTION: if set true, all job's related zk nodes will
+   *          be clean up from zookeeper even if its workflow information can 
not be found.
+   */
+  public void deleteNamespacedJob(final String queue, final String job, 
boolean forceDelete) {
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue);
 
     if (workflowCfg == null) {
       if (forceDelete) {
         // remove all job znodes if its original workflow config was already 
gone.
         LOG.info("Forcefully removing job: " + job + " from queue: " + queue);
-        boolean success = TaskUtil
-            .removeJob(_accessor, _propertyStore, 
TaskUtil.getNamespacedJobName(queue, job));
+        boolean success = TaskUtil.removeJob(_accessor, _propertyStore, job);
         if (!success) {
           LOG.info("Failed to delete job: " + job + " from queue: " + queue);
           throw new HelixException("Failed to delete job: " + job + " from 
queue: " + queue);
@@ -280,13 +288,14 @@ public class TaskDriver {
       return;
     }
 
-    if (workflowCfg.isTerminable()) {
+    if (!workflowCfg.isJobQueue()) {
       throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
     boolean isRecurringWorkflow =
         (workflowCfg.getScheduleConfig() != null && 
workflowCfg.getScheduleConfig().isRecurring());
 
+    String denamespacedJob = TaskUtil.getDenamespacedJobName(queue, job);
     if (isRecurringWorkflow) {
       // delete job from the last scheduled queue if there exists one.
       WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, 
queue);
@@ -297,46 +306,42 @@ public class TaskDriver {
       if (lastScheduledQueue != null) {
         WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, 
lastScheduledQueue);
         if (lastWorkflowCfg != null) {
-          deleteJobFromQueue(lastScheduledQueue, job);
+          deleteJobFromQueue(lastScheduledQueue, denamespacedJob);
         }
       }
     }
-
-    deleteJobFromQueue(queue, job);
+    deleteJobFromQueue(queue, denamespacedJob);
   }
 
   /**
-   * delete a job from a scheduled (non-recurrent) queue.
-   *
+   * Delete a job from a scheduled (non-recurrent) queue.
    * @param queue
-   * @param job
+   * @param job this must be a namespaced job name
    */
-
   private void deleteJobFromQueue(final String queue, final String job) {
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, 
queue);
-    String workflowState = (workflowCtx != null)
-        ? workflowCtx.getWorkflowState().name()
+    String workflowState = (workflowCtx != null) ? 
workflowCtx.getWorkflowState().name()
         : TaskState.NOT_STARTED.name();
 
     if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
       throw new IllegalStateException("Queue " + queue + " is still running!");
     }
 
-    if (workflowState.equals(TaskState.COMPLETED.name()) || 
workflowState.equals(
-        TaskState.FAILED.name()) || 
workflowState.equals(TaskState.ABORTED.name())) {
-      LOG.warn("Queue " + queue + " has already reached its final state, skip 
deleting job from it.");
+    if (workflowState.equals(TaskState.COMPLETED.name())
+        || workflowState.equals(TaskState.FAILED.name())
+        || workflowState.equals(TaskState.ABORTED.name())) {
+      LOG.warn(
+          "Queue " + queue + " has already reached its final state, skip 
deleting job from it.");
       return;
     }
 
-    String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
-    Set<String> jobs = new HashSet<>(Arrays.asList(namespacedJobName));
-    if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, 
jobs, true)) {
+    if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue,
+        Collections.singleton(TaskUtil.getNamespacedJobName(queue, job)), 
true)) {
       LOG.error("Failed to delete job " + job + " from queue " + queue);
       throw new HelixException("Failed to delete job " + job + " from queue " 
+ queue);
     }
   }
 
-
   /**
    * Adds a new job to the end an existing named queue.
    *

Reply via email to