cyy0714 commented on a change in pull request #4505: [TE] Update
AnomalyFlattenResource
URL: https://github.com/apache/incubator-pinot/pull/4505#discussion_r312226588
##########
File path:
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
##########
@@ -23,87 +23,205 @@
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.validation.constraints.NotNull;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyFeedback;
import org.apache.pinot.thirdeye.api.Constants;
import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
/**
- * Flatten the anomaly results for UI purpose.
- * Convert a list of anomalies to rows of columns so that UI can directly
convert the anomalies to table
+ * Provide a table combining metrics data and anomaly feedback for UI
representation
*/
@Path("thirdeye/table")
@Api(tags = {Constants.ANOMALY_TAG})
public class AnomalyFlattenResource {
- private MergedAnomalyResultManager mergedAnomalyResultDAO;
- public static final String ANOMALY_ID = "anomalyId";
+ private final MergedAnomalyResultManager mergedAnomalyResultDAO;
+ private final MetricConfigManager metricConfigDAO;
+ private final DatasetConfigManager datasetConfgDAO;
+
+ private final ExecutorService executor;
+
public static final String ANOMALY_COMMENT = "comment";
+ private static final String DATAFRAME_VALUE = "value";
+ private static final String DEFAULT_COMMENT_FORMAT = "#%d is %s";
+ private static final int DEFAULT_THREAD_POOLS_SIZE = 5;
+ private static final long DEFAULT_TIME_OUT_IN_MINUTES = 1;
- public AnomalyFlattenResource(MergedAnomalyResultManager
mergedAnomalyResultDAO) {
+ public AnomalyFlattenResource(MergedAnomalyResultManager
mergedAnomalyResultDAO, DatasetConfigManager datasetConfgDAO,
+ MetricConfigManager metricConfigDAO) {
this.mergedAnomalyResultDAO = mergedAnomalyResultDAO;
+ this.datasetConfgDAO = datasetConfgDAO;
+ this.metricConfigDAO = metricConfigDAO;
+ this.executor = Executors.newFixedThreadPool(DEFAULT_THREAD_POOLS_SIZE);
}
+
/**
- * Returns a list of formatted merged anomalies for UI to render a table
- * @param detectionConfigId detectionConfigId
+ * Returns a list of formatted metric values and anomaly comments for UI to
generate a table
+ * @param metricIdStr a string of metric ids separated by comma
* @param start start time in epoc milliseconds
* @param end end time in epoc milliseconds
- * @param dimensionKeys a list of keys in dimensions; if null, will return
all dimension keys for the anomaly
- * @return a list of formatted anomalies
+ * @param dimensionStrings a list of keys in dimensions joined by comma
+ * @return a list of formatted metric info and anomaly comments
*/
@GET
- @ApiOperation(value = "Returns a list of formatted merged anomalies ")
- public List<Map<String, Object>> flatAnomalyResults(
- @ApiParam("detection config id") @QueryParam("detectionConfigId") long
detectionConfigId,
+ @ApiOperation(value = "View a collection of metrics and anonalies feedback
in a list of maps")
+ @Produces("Application/json")
+ public List<Map<String, Object>> listDimensionValues(
+ @ApiParam("metric config id") @NotNull @QueryParam("metricIds") String
metricIdStr,
@ApiParam("start time for anomalies") @QueryParam("start") long start,
@ApiParam("end time for anomalies") @QueryParam("end") long end,
- @ApiParam("dimension keys") @QueryParam("dimensionKeys") List<String>
dimensionKeys) {
- // Retrieve anomalies
- List<MergedAnomalyResultDTO> anomalies = mergedAnomalyResultDAO.
- findByStartTimeInRangeAndDetectionConfigId(start, end,
detectionConfigId);
+ @ApiParam("dimension keys") @NotNull @QueryParam("dimensionKeys") String
dimensionStrings) throws Exception {
+ Preconditions.checkArgument(StringUtils.isNotBlank(metricIdStr));
+ List<String> dimensionKeys = Arrays.asList(dimensionStrings.split(","));
+ List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+ List<MetricConfigDTO> metrics = new ArrayList<>();
+ for (String metricId : metricIdStr.split(",")) {
+ long id = Long.valueOf(metricId);
+ metrics.add(metricConfigDAO.findById(id));
+
anomalies.addAll(mergedAnomalyResultDAO.findAnomaliesByMetricIdAndTimeRange(id,
start, end));
+ }
- // flatten anomaly result information
- List<Map<String, Object>> resultList = new ArrayList<>();
- for (MergedAnomalyResultDTO result : anomalies) {
- resultList.add(flatAnomalyResult(result, dimensionKeys));
+ Map<String, DataFrame> metricDataFrame = new HashMap<>();
+
+ AggregationLoader aggregationLoader =
+ new DefaultAggregationLoader(metricConfigDAO, datasetConfgDAO,
+ ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+ ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+ Map<String, Future<DataFrame>> futureMap = new HashMap<String,
Future<DataFrame>>() {{
+ for (MetricConfigDTO metricDTO : metrics) {
+ Future<DataFrame> future = fetchAggregatedMetric(aggregationLoader,
metricDTO.getId(), start, end,
+ dimensionKeys);
+ put(metricDTO.getName(), future);
+ }
+ }};
+ for (String metric : futureMap.keySet()) {
+ Future<DataFrame> future = futureMap.get(metric);
+ metricDataFrame.put(metric, future.get(DEFAULT_TIME_OUT_IN_MINUTES,
TimeUnit.MINUTES));
}
- return resultList;
+
+ return reformatDataFrameAndAnomalies(metricDataFrame, anomalies,
dimensionKeys);
+ }
+
+ private Future<DataFrame> fetchAggregatedMetric(AggregationLoader
aggregationLoader, long metricId, long start,
+ long end, List<String> dimensionKeys) {
+ return executor.submit(() -> {
+ MetricSlice metricSlice = MetricSlice.from(metricId, start, end);
+ DataFrame resultDataFrame = null;
+ try {
+ resultDataFrame = aggregationLoader.loadAggregate(metricSlice,
dimensionKeys, -1);
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ return resultDataFrame;
+ });
}
/**
- * Flat an anomaly to a flat map structure
- * @param anomalyResult an instance of MergedAnomalyResultDTO
- * @param tableKeys a list of keys in dimensions; if null, use all keys in
dimension
- * @return a map of information in the anomaly result with the required keys
+ * Reformat the rows in data frame and anomaly information into a list of map
+ * @param metricDataFrame a map from metric name to dataframe with
aggregated time series
+ * @param anomalies a list of anomalies within the same time interval as
dataframe
+ * @param dimensions a list of dimensions
+ * @return a list of maps with dataframe and anomaly information
*/
- public static Map<String, Object> flatAnomalyResult(MergedAnomalyResultDTO
anomalyResult,
- List<String> tableKeys) {
- Preconditions.checkNotNull(anomalyResult);
- Map<String, Object> flatMap = new HashMap<>();
- flatMap.put(ANOMALY_ID, anomalyResult.getId());
- DimensionMap dimension = anomalyResult.getDimensions();
- if (tableKeys == null) {
- tableKeys = new ArrayList<>(dimension.keySet());
+ public static List<Map<String, Object>>
reformatDataFrameAndAnomalies(Map<String, DataFrame> metricDataFrame,
+ List<MergedAnomalyResultDTO> anomalies, List<String> dimensions) {
+ List<Map<String, Object>> resultList = new ArrayList<>();
+ Map<DimensionMap, Map<Long, String>> anomalyCommentMap =
extractComments(anomalies);
+ List<String> metrics = new ArrayList<>(metricDataFrame.keySet());
+ Map<DimensionMap, Map<String, Double>> metricValues = new HashMap<>();
+ for (String metric : metrics) {
+ DataFrame dataFrame = metricDataFrame.get(metric);
+ for (int i = 0; i < dataFrame.size(); i++) {
+ DimensionMap dimensionMap = new DimensionMap();
+ for (String dimensionKey : dimensions) {
+ String dimensionValue = dataFrame.getString(dimensionKey, i);
+ dimensionMap.put(dimensionKey, dimensionValue);
+ }
+ if (!metricValues.containsKey(dimensionMap)) {
+ metricValues.put(dimensionMap, new HashMap<>());
+ }
+ metricValues.get(dimensionMap).put(metric,
dataFrame.getDouble(DATAFRAME_VALUE, i));
+ }
}
- for (String key : tableKeys) {
- flatMap.put(key, dimension.getOrDefault(key, ""));
+
+ for (DimensionMap dimensionMap : metricValues.keySet()) {
+ Map<String, Object> resultMap = new HashMap<>();
+ for (String key : dimensionMap.keySet()) {
+ resultMap.put(key, dimensionMap.get(key));
+ }
+ for (String metric : metrics) {
+ if (metricValues.containsKey(dimensionMap)) {
Review comment:
If dimensionMap is not in metricValues, metricValues.get(dimensionMap) will
return null. NPE will be thrown with the process.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]