Repository: helix Updated Branches: refs/heads/master 0c251bbf6 -> 566d4f166
[HELIX-773] add getLastScheduledTaskTimestamp information in workflow rest api Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/566d4f16 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/566d4f16 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/566d4f16 Branch: refs/heads/master Commit: 566d4f166473b477ea0db1cfba5d04c8f3d6bf30 Parents: 0c251bb Author: Harry Zhang <hrzh...@linkedin.com> Authored: Tue Oct 30 16:43:25 2018 -0700 Committer: Harry Zhang <hrzh...@linkedin.com> Committed: Wed Oct 31 13:48:46 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 22 +++- .../apache/helix/task/TaskExecutionInfo.java | 65 ++++++++++ .../task/TestGetLastScheduledTaskExecInfo.java | 122 +++++++++++++++++++ .../task/TestGetLastScheduledTaskTimestamp.java | 110 ----------------- .../resources/helix/WorkflowAccessor.java | 5 +- .../helix/rest/server/TestWorkflowAccessor.java | 9 +- 6 files changed, 213 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index e6256ed..54e3ab3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -85,7 +85,6 @@ public class TaskDriver { // TODO Implement or configure the limitation in ZK server. private final static long DEFAULT_CONFIGS_LIMITATION = HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.TASK_CONFIG_LIMITATION, 100000L); - private final static long TIMESTAMP_NOT_SET = -1L; private final static String TASK_START_TIME_KEY = "START_TIME"; protected long _configsLimitation = DEFAULT_CONFIGS_LIMITATION; @@ -977,14 +976,22 @@ public class TaskDriver { * -1L if timestamp is not set (either nothing is scheduled or no start time recorded). */ public long getLastScheduledTaskTimestamp(String workflowName) { - long lastScheduledTaskTimestamp = TIMESTAMP_NOT_SET; + return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp(); + } + + public TaskExecutionInfo getLastScheduledTaskExecutionInfo(String workflowName) { + long lastScheduledTaskTimestamp = TaskExecutionInfo.TIMESTAMP_NOT_SET; + String jobName = null; + Integer taskPartitionIndex = null; + TaskPartitionState state = null; + WorkflowContext workflowContext = getWorkflowContext(workflowName); if (workflowContext != null) { Map<String, TaskState> allJobStates = workflowContext.getJobStates(); - for (String job : allJobStates.keySet()) { - if (!allJobStates.get(job).equals(TaskState.NOT_STARTED)) { - JobContext jobContext = getJobContext(job); + for (Map.Entry<String, TaskState> jobState : allJobStates.entrySet()) { + if (!jobState.getValue().equals(TaskState.NOT_STARTED)) { + JobContext jobContext = getJobContext(jobState.getKey()); if (jobContext != null) { Set<Integer> allPartitions = jobContext.getPartitionSet(); for (Integer partition : allPartitions) { @@ -993,6 +1000,9 @@ public class TaskDriver { long startTimeLong = Long.parseLong(startTime); if (startTimeLong > lastScheduledTaskTimestamp) { lastScheduledTaskTimestamp = startTimeLong; + jobName = jobState.getKey(); + taskPartitionIndex = partition; + state = jobContext.getPartitionState(partition); } } } @@ -1000,7 +1010,7 @@ public class TaskDriver { } } } - return lastScheduledTaskTimestamp; + return new TaskExecutionInfo(jobName, taskPartitionIndex, state, lastScheduledTaskTimestamp); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java b/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java new file mode 100644 index 0000000..03d66b4 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java @@ -0,0 +1,65 @@ +package org.apache.helix.task; + +import java.io.IOException; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class TaskExecutionInfo { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public final static long TIMESTAMP_NOT_SET = -1L; + private final String _jobName; + private final Integer _taskPartitionIndex; + private final TaskPartitionState _taskPartitionState; + private final Long _startTimeStamp; + + @JsonCreator + public TaskExecutionInfo( + @JsonProperty("jobName") String job, + @JsonProperty("taskPartitionIndex") Integer index, + @JsonProperty("taskPartitionState") TaskPartitionState state, + @JsonProperty("startTimeStamp") Long timeStamp) { + _jobName = job; + _taskPartitionIndex = index; + _taskPartitionState = state; + _startTimeStamp = timeStamp == null ? TIMESTAMP_NOT_SET : timeStamp; + } + + public String getJobName() { + return _jobName; + } + + public Integer getTaskPartitionIndex() { + return _taskPartitionIndex; + } + + public TaskPartitionState getTaskPartitionState() { + return _taskPartitionState; + } + + public Long getStartTimeStamp() { + return _startTimeStamp; + } + + public String toJson() throws IOException { + return OBJECT_MAPPER.writeValueAsString(this); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof TaskExecutionInfo)) { + return false; + } + TaskExecutionInfo infoObj = (TaskExecutionInfo) obj; + return nullOrEquals(getJobName(), infoObj.getJobName()) && + nullOrEquals(getTaskPartitionIndex(), infoObj.getTaskPartitionIndex()) && + nullOrEquals(getTaskPartitionState(), infoObj.getTaskPartitionState()) && + nullOrEquals(getStartTimeStamp(), infoObj.getStartTimeStamp()); + } + + private boolean nullOrEquals(Object o1, Object o2) { + return (o1 == null && o2 == null) || (o1 != null && o2 != null && o1.equals(o2)); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java new file mode 100644 index 0000000..73fe674 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java @@ -0,0 +1,122 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.integration.task.TaskTestUtil; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestGetLastScheduledTaskExecInfo extends TaskTestBase { + private final static String TASK_START_TIME_KEY = "START_TIME"; + private final static long INVALID_TIMESTAMP = -1L; + + @BeforeClass + public void beforeClass() throws Exception { + setSingleTestEnvironment(); + super.beforeClass(); + } + + @Test + public void testGetLastScheduledTaskExecInfo() throws InterruptedException { + List<Long> startTimesWithStuckTasks = setupTasks("TestWorkflow_2", 5, 99999999); + + // First two must be -1 (two tasks are stuck), and API call must return the last value (most recent timestamp) + Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), INVALID_TIMESTAMP); + Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), INVALID_TIMESTAMP); + TaskExecutionInfo execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_2"); + Long lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_2"); + Assert.assertEquals(startTimesWithStuckTasks.get(3), lastScheduledTaskTs); + + Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_2_job_0"); + // Workflow 2 will stuck, so its partition state will be RUNNING + Assert.assertEquals(execInfo.getTaskPartitionState(), TaskPartitionState.RUNNING); + Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs); + + List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10); + // API call needs to return the most recent timestamp (value at last index) + lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_3"); + execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3"); + + Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1), lastScheduledTaskTs); + Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_3_job_0"); + Assert.assertEquals(execInfo.getTaskPartitionState(), TaskPartitionState.COMPLETED); + Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs); + } + + /** + * Helper method for gathering start times for all tasks. Returns start times in ascending order. Null start times + * are recorded as 0. + * + * @param jobQueueName name of the queue + * @param numTasks number of tasks to schedule + * @param taskTimeout duration of each task to be run for + * @return list of timestamps for all tasks in ascending order + * @throws InterruptedException + */ + private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout) throws InterruptedException { + // Create a queue + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName); + + // Create and enqueue a job + JobConfig.Builder jobConfig = new JobConfig.Builder(); + + // Create tasks + List<TaskConfig> taskConfigs = new ArrayList<>(); + for (int i = 0; i < numTasks; i++) { + taskConfigs.add(new TaskConfig.Builder() + .setTaskId("task_" + i) + .setCommand(MockTask.TASK_COMMAND) + .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout)) + .build()); + } + // Run up to 2 tasks at a time + jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2); + queueBuilder.enqueueJob("job_0", jobConfig); + _driver.start(queueBuilder.build()); + // 1 second delay for the Controller + Thread.sleep(1000); + + // Pull jobContexts and look at the start times + List<Long> startTimes = new ArrayList<>(); + WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName); + for (String job : workflowConfig.getJobDag().getAllNodes()) { + JobContext jobContext = _driver.getJobContext(job); + Set<Integer> allPartitions = jobContext.getPartitionSet(); + for (Integer partition : allPartitions) { + String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY); + if (timestamp == null) { + startTimes.add(INVALID_TIMESTAMP); + } else { + startTimes.add(Long.parseLong(timestamp)); + } + } + } + Collections.sort(startTimes); + return startTimes; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java deleted file mode 100644 index 174d8c6..0000000 --- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.apache.helix.task; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import org.apache.helix.integration.task.MockTask; -import org.apache.helix.integration.task.TaskTestBase; -import org.apache.helix.integration.task.TaskTestUtil; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - - -public class TestGetLastScheduledTaskTimestamp extends TaskTestBase { - private final static String TASK_START_TIME_KEY = "START_TIME"; - private final static long INVALID_TIMESTAMP = -1L; - - @BeforeClass - public void beforeClass() throws Exception { - setSingleTestEnvironment(); - super.beforeClass(); - } - - @Test - public void testGetLastScheduledTaskTimestamp() throws InterruptedException { - List<Long> startTimesWithStuckTasks = setupTasks("TestWorkflow_2", 5, 99999999); - // First two must be -1 (two tasks are stuck), and API call must return the last value (most recent timestamp) - Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), INVALID_TIMESTAMP); - Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), INVALID_TIMESTAMP); - Assert.assertEquals(startTimesWithStuckTasks.get(3).longValue(), - _driver.getLastScheduledTaskTimestamp("TestWorkflow_2")); - - List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10); - // API call needs to return the most recent timestamp (value at last index) - Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1).longValue(), - _driver.getLastScheduledTaskTimestamp("TestWorkflow_3")); - } - - /** - * Helper method for gathering start times for all tasks. Returns start times in ascending order. Null start times - * are recorded as 0. - * - * @param jobQueueName name of the queue - * @param numTasks number of tasks to schedule - * @param taskTimeout duration of each task to be run for - * @return list of timestamps for all tasks in ascending order - * @throws InterruptedException - */ - private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout) throws InterruptedException { - // Create a queue - JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName); - - // Create and enqueue a job - JobConfig.Builder jobConfig = new JobConfig.Builder(); - - // Create tasks - List<TaskConfig> taskConfigs = new ArrayList<>(); - for (int i = 0; i < numTasks; i++) { - taskConfigs.add(new TaskConfig.Builder() - .setTaskId("task_" + i) - .setCommand(MockTask.TASK_COMMAND) - .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout)) - .build()); - } - // Run up to 2 tasks at a time - jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2); - queueBuilder.enqueueJob("job_0", jobConfig); - _driver.start(queueBuilder.build()); - // 1 second delay for the Controller - Thread.sleep(1000); - - // Pull jobContexts and look at the start times - List<Long> startTimes = new ArrayList<>(); - WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName); - for (String job : workflowConfig.getJobDag().getAllNodes()) { - JobContext jobContext = _driver.getJobContext(job); - Set<Integer> allPartitions = jobContext.getPartitionSet(); - for (Integer partition : allPartitions) { - String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY); - if (timestamp == null) { - startTimes.add(INVALID_TIMESTAMP); - } else { - startTimes.add(Long.parseLong(timestamp)); - } - } - } - Collections.sort(startTimes); - return startTimes; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java index 7cfbe85..9a9a62b 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java @@ -62,7 +62,8 @@ public class WorkflowAccessor extends AbstractHelixResource { WorkflowConfig, WorkflowContext, Jobs, - ParentJobs + ParentJobs, + LastScheduledTask } public enum TaskCommand { @@ -112,7 +113,7 @@ public class WorkflowAccessor extends AbstractHelixResource { ObjectNode parentJobs = OBJECT_MAPPER.valueToTree(jobDag.getChildrenToParents()); root.put(WorkflowProperties.Jobs.name(), jobs); root.put(WorkflowProperties.ParentJobs.name(), parentJobs); - + root.put(WorkflowProperties.LastScheduledTask.name(), OBJECT_MAPPER.valueToTree(taskDriver.getLastScheduledTaskExecutionInfo(workflowId))); return JSONRepresentation(root); } http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java index ad8894a..3e3b8ae 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java @@ -6,12 +6,12 @@ import java.util.Set; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - import org.apache.helix.TestHelper; import org.apache.helix.rest.server.resources.helix.WorkflowAccessor; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskExecutionInfo; import org.apache.helix.task.TaskState; import org.apache.helix.task.WorkflowConfig; import org.codehaus.jackson.JsonNode; @@ -51,12 +51,17 @@ public class TestWorkflowAccessor extends AbstractTestClass { @Test(dependsOnMethods = "testGetWorkflows") public void testGetWorkflow() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); - String body = get("clusters/" + CLUSTER_NAME + "/workflows/" + WORKFLOW_NAME, Response.Status.OK.getStatusCode(), true); JsonNode node = OBJECT_MAPPER.readTree(body); Assert.assertNotNull(node.get(WorkflowAccessor.WorkflowProperties.WorkflowConfig.name())); Assert.assertNotNull(node.get(WorkflowAccessor.WorkflowProperties.WorkflowContext.name())); + + TaskExecutionInfo lastScheduledTask = OBJECT_MAPPER + .treeToValue(node.get(WorkflowAccessor.WorkflowProperties.LastScheduledTask.name()), + TaskExecutionInfo.class); + Assert.assertTrue(lastScheduledTask + .equals(new TaskExecutionInfo(null, null, null, TaskExecutionInfo.TIMESTAMP_NOT_SET))); String workflowId = node.get(WorkflowAccessor.WorkflowProperties.WorkflowConfig.name()).get("WorkflowID") .getTextValue();