This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 767a149 Add endpoint to check minion task status for a single task.
(#7353)
767a149 is described below
commit 767a149773a27fb38fadfd1448fd6162dff510ac
Author: Ramakrishna Baratam <[email protected]>
AuthorDate: Tue Aug 24 10:00:51 2021 -0700
Add endpoint to check minion task status for a single task. (#7353)
Add debug endpoint to get minion task information for a single task.
Add endpoint to get count of sub-tasks for each of the tasks for the given
task type.
Added DateTimeUtils to pinot common.
---
.../common/restlet/resources/SegmentErrorInfo.java | 24 +---
.../apache/pinot/common/utils/DateTimeUtils.java | 47 +++++++
.../api/resources/PinotTaskRestletResource.java | 22 ++-
.../core/minion/PinotHelixTaskResourceManager.java | 148 +++++++++++++--------
4 files changed, 161 insertions(+), 80 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java
index 29e7257..5819e5c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java
@@ -22,11 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-import java.util.TimeZone;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.common.utils.DateTimeUtils;
/**
@@ -37,17 +34,10 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
@JsonPropertyOrder({"timestamp", "errorMessage", "stackTrace"}) // For
readability of JSON output
public class SegmentErrorInfo {
- private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss z";
- private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new
SimpleDateFormat(DATE_FORMAT, Locale.getDefault());
-
private final String _timestamp;
private final String _errorMessage;
private final String _stackTrace;
- static {
- SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getDefault());
- }
-
/**
* This constructor is specifically for JSON ser/de.
*
@@ -71,7 +61,7 @@ public class SegmentErrorInfo {
* @param exception Exception
*/
public SegmentErrorInfo(long timestampMs, String errorMessage, Exception
exception) {
- _timestamp = epochToSDF(timestampMs);
+ _timestamp = DateTimeUtils.epochToDefaultDateFormat(timestampMs);
_errorMessage = errorMessage;
_stackTrace = (exception != null) ?
ExceptionUtils.getStackTrace(exception) : null;
}
@@ -87,14 +77,4 @@ public class SegmentErrorInfo {
public String getTimestamp() {
return _timestamp;
}
-
- /**
- * Utility function to convert epoch in millis to SDF of form "yyyy-MM-dd
HH:mm:ss z".
- *
- * @param millisSinceEpoch Time in millis to convert
- * @return SDF equivalent
- */
- private static String epochToSDF(long millisSinceEpoch) {
- return SIMPLE_DATE_FORMAT.format(new Date(millisSinceEpoch));
- }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DateTimeUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DateTimeUtils.java
new file mode 100644
index 0000000..bcce064
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DateTimeUtils.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+
+public class DateTimeUtils {
+ private DateTimeUtils() {
+ }
+
+ private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss z";
+ private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new
SimpleDateFormat(DATE_FORMAT, Locale.getDefault());
+
+ static {
+ SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getDefault());
+ }
+
+ /**
+ * Utility function to convert epoch in millis to SDF of form "yyyy-MM-dd
HH:mm:ss z".
+ *
+ * @param millisSinceEpoch Time in millis to convert
+ * @return SDF equivalent
+ */
+ public static String epochToDefaultDateFormat(long millisSinceEpoch) {
+ return SIMPLE_DATE_FORMAT.format(new Date(millisSinceEpoch));
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 69f075b..a36a41f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -135,13 +135,31 @@ public class PinotTaskRestletResource {
}
@GET
+ @Path("/tasks/{taskType}/taskcounts")
+ @ApiOperation("Fetch count of sub-tasks for each of the tasks for the given
task type")
+ public Map<String, PinotHelixTaskResourceManager.TaskCount> getTaskCounts(
+ @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType) {
+ return _pinotHelixTaskResourceManager.getTaskCounts(taskType);
+ }
+
+ @GET
@Path("/tasks/{taskType}/debug")
@ApiOperation("Fetch information for all the tasks for the given task type")
- public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo>
getTaskDebugInfo(
+ public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo>
getTasksDebugInfo(
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "verbosity (By default, prints for running and error
tasks. Value of >0 prints for all tasks)")
@DefaultValue("0") @QueryParam("verbosity") int verbosity) {
- return _pinotHelixTaskResourceManager.getTaskDebugInfo(taskType,
verbosity);
+ return _pinotHelixTaskResourceManager.getTasksDebugInfo(taskType,
verbosity);
+ }
+
+ @GET
+ @Path("/tasks/task/{taskName}/debug")
+ @ApiOperation("Fetch information for the given task name")
+ public PinotHelixTaskResourceManager.TaskDebugInfo getTaskDebugInfo(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
+ @ApiParam(value = "verbosity (By default, prints for running and error
tasks. Value of >0 prints for all tasks)")
+ @DefaultValue("0") @QueryParam("verbosity") int verbosity) {
+ return _pinotHelixTaskResourceManager.getTaskDebugInfo(taskName,
verbosity);
}
@Deprecated
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 794fedc..e825dbb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -22,16 +22,13 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
-import java.util.TimeZone;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.task.JobConfig;
@@ -43,6 +40,7 @@ import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
+import org.apache.pinot.common.utils.DateTimeUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.slf4j.Logger;
@@ -63,8 +61,6 @@ public class PinotHelixTaskResourceManager {
private static final String TASK_QUEUE_PREFIX = "TaskQueue" +
TASK_NAME_SEPARATOR;
private static final String TASK_PREFIX = "Task" + TASK_NAME_SEPARATOR;
- private static final SimpleDateFormat SIMPLE_DATE_FORMAT =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z", Locale.getDefault());
private final TaskDriver _taskDriver;
@@ -382,6 +378,26 @@ public class PinotHelixTaskResourceManager {
}
/**
+ * Fetch count of sub-tasks for each of the tasks for the given taskType.
+ *
+ * @param taskType Pinot taskType / Helix JobQueue
+ * @return Map of Pinot Task Name to TaskCount
+ */
+ public synchronized Map<String, TaskCount> getTaskCounts(String taskType) {
+ Map<String, TaskCount> taskCounts = new TreeMap<>();
+ WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+ if (workflowContext == null) {
+ return taskCounts;
+ }
+ Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+ for (String helixJobName : helixJobStates.keySet()) {
+ String pinotTaskName = getPinotTaskName(helixJobName);
+ taskCounts.put(pinotTaskName, getTaskCount(pinotTaskName));
+ }
+ return taskCounts;
+ }
+
+ /**
* Given a taskType, helper method to debug all the HelixJobs for the
taskType.
* For each of the HelixJobs, collects status of the (sub)tasks in the
taskbatch.
*
@@ -390,67 +406,87 @@ public class PinotHelixTaskResourceManager {
* If verbosity > 0, shows details for all tasks.
* @return Map of Pinot Task Name to TaskDebugInfo. TaskDebugInfo contains
details for subtasks.
*/
- public synchronized Map<String, TaskDebugInfo> getTaskDebugInfo(String
taskType, int verbosity) {
- Map<String, TaskDebugInfo> taskDebugInfos = new TreeMap<String,
TaskDebugInfo>();
+ public synchronized Map<String, TaskDebugInfo> getTasksDebugInfo(String
taskType, int verbosity) {
+ Map<String, TaskDebugInfo> taskDebugInfos = new TreeMap<>();
WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
if (workflowContext == null) {
return taskDebugInfos;
}
- boolean showCompleted = verbosity > 0;
- SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getDefault());
Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
- for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
- String helixJobName = entry.getKey();
- String pinotTaskName = getPinotTaskName(helixJobName);
- TaskDebugInfo taskDebugInfo = new TaskDebugInfo();
- taskDebugInfo.setTaskState(entry.getValue());
- long jobStartTimeMs = workflowContext.getJobStartTime(helixJobName);
- if (jobStartTimeMs > 0) {
- taskDebugInfo.setStartTime(SIMPLE_DATE_FORMAT.format(jobStartTimeMs));
+ for (String helixJobName : helixJobStates.keySet()) {
+ taskDebugInfos.put(getPinotTaskName(helixJobName),
getTaskDebugInfo(workflowContext, helixJobName, verbosity));
+ }
+ return taskDebugInfos;
+ }
+
+ /**
+ * Given a taskName, collects status of the (sub)tasks in the taskName.
+ *
+ * @param taskName Pinot taskName
+ * @param verbosity By default, does not show details for completed
tasks.
+ * If verbosity > 0, shows details for all tasks.
+ * @return TaskDebugInfo contains details for subtasks in this task batch.
+ */
+ public synchronized TaskDebugInfo getTaskDebugInfo(String taskName, int
verbosity) {
+ String taskType = getTaskType(taskName);
+ WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+ if (workflowContext == null) {
+ return null;
+ }
+ String helixJobName = getHelixJobName(taskName);
+ return getTaskDebugInfo(workflowContext, helixJobName, verbosity);
+ }
+
+ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext
workflowContext, String helixJobName,
+ int verbosity) {
+ boolean showCompleted = verbosity > 0;
+ TaskDebugInfo taskDebugInfo = new TaskDebugInfo();
+ taskDebugInfo.setTaskState(workflowContext.getJobState(helixJobName));
+ long jobStartTimeMs = workflowContext.getJobStartTime(helixJobName);
+ if (jobStartTimeMs > 0) {
+
taskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(jobStartTimeMs));
+ }
+ JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+ if (jobContext != null) {
+ JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+ long jobExecutionStartTimeMs = jobContext.getExecutionStartTime();
+ if (jobExecutionStartTimeMs > 0) {
+
taskDebugInfo.setExecutionStartTime(DateTimeUtils.epochToDefaultDateFormat(jobExecutionStartTimeMs));
}
- JobContext jobContext = _taskDriver.getJobContext(helixJobName);
- if (jobContext != null) {
- JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
- long jobExecutionStartTimeMs = jobContext.getExecutionStartTime();
- if (jobExecutionStartTimeMs > 0) {
-
taskDebugInfo.setExecutionStartTime(SIMPLE_DATE_FORMAT.format(jobExecutionStartTimeMs));
+ Set<Integer> partitionSet = jobContext.getPartitionSet();
+ TaskCount subtaskCount = new TaskCount();
+ for (int partition : partitionSet) {
+ // First get the partition's state and update the subtaskCount
+ TaskPartitionState partitionState =
jobContext.getPartitionState(partition);
+ subtaskCount.addTaskState(partitionState);
+ // Skip details for COMPLETED tasks
+ if (!showCompleted && partitionState == TaskPartitionState.COMPLETED) {
+ continue;
}
- Set<Integer> partitionSet = jobContext.getPartitionSet();
- TaskCount subtaskCount = new TaskCount();
- for (int partition : partitionSet) {
- // First get the partition's state and update the subtaskCount
- TaskPartitionState partitionState =
jobContext.getPartitionState(partition);
- subtaskCount.addTaskState(partitionState);
- // Skip details for COMPLETED tasks
- if (!showCompleted && partitionState ==
TaskPartitionState.COMPLETED) {
- continue;
- }
- SubtaskDebugInfo subtaskDebugInfo = new SubtaskDebugInfo();
- String taskIdForPartition =
jobContext.getTaskIdForPartition(partition);
- subtaskDebugInfo.setTaskId(taskIdForPartition);
- subtaskDebugInfo.setState(partitionState);
- long subtaskStartTimeMs =
jobContext.getPartitionStartTime(partition);
- if (subtaskStartTimeMs > 0) {
-
subtaskDebugInfo.setStartTime(SIMPLE_DATE_FORMAT.format(subtaskStartTimeMs));
- }
- long subtaskFinishTimeMs =
jobContext.getPartitionFinishTime(partition);
- if (subtaskFinishTimeMs > 0) {
-
subtaskDebugInfo.setFinishTime(SIMPLE_DATE_FORMAT.format(subtaskFinishTimeMs));
- }
-
subtaskDebugInfo.setParticipant(jobContext.getAssignedParticipant(partition));
- subtaskDebugInfo.setInfo(jobContext.getPartitionInfo(partition));
- TaskConfig helixTaskConfig =
jobConfig.getTaskConfig(taskIdForPartition);
- if (helixTaskConfig != null) {
- PinotTaskConfig pinotTaskConfig =
PinotTaskConfig.fromHelixTaskConfig(helixTaskConfig);
- subtaskDebugInfo.setTaskConfig(pinotTaskConfig);
- }
- taskDebugInfo.addSubtaskInfo(subtaskDebugInfo);
+ SubtaskDebugInfo subtaskDebugInfo = new SubtaskDebugInfo();
+ String taskIdForPartition =
jobContext.getTaskIdForPartition(partition);
+ subtaskDebugInfo.setTaskId(taskIdForPartition);
+ subtaskDebugInfo.setState(partitionState);
+ long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition);
+ if (subtaskStartTimeMs > 0) {
+
subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs));
+ }
+ long subtaskFinishTimeMs =
jobContext.getPartitionFinishTime(partition);
+ if (subtaskFinishTimeMs > 0) {
+
subtaskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(subtaskFinishTimeMs));
}
- taskDebugInfo.setSubtaskCount(subtaskCount);
+
subtaskDebugInfo.setParticipant(jobContext.getAssignedParticipant(partition));
+ subtaskDebugInfo.setInfo(jobContext.getPartitionInfo(partition));
+ TaskConfig helixTaskConfig =
jobConfig.getTaskConfig(taskIdForPartition);
+ if (helixTaskConfig != null) {
+ PinotTaskConfig pinotTaskConfig =
PinotTaskConfig.fromHelixTaskConfig(helixTaskConfig);
+ subtaskDebugInfo.setTaskConfig(pinotTaskConfig);
+ }
+ taskDebugInfo.addSubtaskInfo(subtaskDebugInfo);
}
- taskDebugInfos.put(pinotTaskName, taskDebugInfo);
+ taskDebugInfo.setSubtaskCount(subtaskCount);
}
- return taskDebugInfos;
+ return taskDebugInfo;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]