This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 252475bc5e [#9983] feat(optimizer): support monitor code skeleton
(#10001)
252475bc5e is described below
commit 252475bc5e7a5d96c4ce5e31454ae099b3bd216f
Author: FANNG <[email protected]>
AuthorDate: Wed Feb 25 12:06:22 2026 +0900
[#9983] feat(optimizer): support monitor code skeleton (#10001)
## What changes were proposed in this pull request?
- Add monitor skeleton APIs under `api/monitor`, including
evaluator/provider/callback
contracts and evaluation result model.
- Add `Monitor` orchestration skeleton to evaluate table and related job
metrics around an
action timestamp.
- Add monitor-related config keys and SPI loaders in `OptimizerConfig`,
`ProviderUtils`,
and `InstanceLoaderUtils`.
- Add unit tests and test SPI registrations for monitor
provider/evaluator/callback
loading and monitor execution flow.
- Keep scope focused on skeleton only: no `OptimizerCmd` monitor
CLI/service changes and
no monitor service implementation classes.
## Why are the changes needed?
Fixes: #9983
- This PR introduces the monitor abstraction layer and minimal execution
skeleton so
follow-up PRs can add concrete monitor implementations incrementally.
## Does this PR introduce any user-facing change?
- No.
## How was this patch tested?
- `./gradlew :maintenance:optimizer:spotlessApply`
- `./gradlew :maintenance:optimizer:test -PskipITs`
Co-authored-by: Qi Yu <[email protected]>
---
.../optimizer/api/monitor/EvaluationResult.java | 136 +++++++++
.../optimizer/api/monitor/JobProvider.java | 37 +++
.../optimizer/api/monitor/MetricScope.java | 102 +++++++
.../optimizer/api/monitor/MetricsEvaluator.java | 47 +++
.../optimizer/api/monitor/MetricsProvider.java | 66 +++++
.../optimizer/api/monitor/MonitorCallback.java | 35 +++
.../optimizer/common/conf/OptimizerConfig.java | 52 +++-
.../optimizer/common/util/InstanceLoaderUtils.java | 37 ++-
.../optimizer/common/util/ProviderUtils.java | 15 +
.../maintenance/optimizer/monitor/Monitor.java | 317 +++++++++++++++++++++
.../common/util/TestInstanceLoaderUtils.java | 76 +++++
.../optimizer/common/util/TestProviderUtils.java | 16 ++
.../maintenance/optimizer/monitor/TestMonitor.java | 192 +++++++++++++
.../monitor/callback/MonitorCallbackForTest.java | 55 ++++
.../monitor/evaluator/MetricsEvaluatorForTest.java | 63 ++++
.../optimizer/monitor/job/JobProviderForTest.java | 48 ++++
.../monitor/metrics/MetricsProviderForTest.java | 123 ++++++++
...itino.maintenance.optimizer.api.common.Provider | 3 +
...tenance.optimizer.api.monitor.MetricsEvaluator} | 3 +-
19 files changed, 1405 insertions(+), 18 deletions(-)
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
new file mode 100644
index 0000000000..2f3f2acdd4
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+
+/** Immutable evaluation result passed to monitor callbacks. */
+@DeveloperApi
+public class EvaluationResult {
+
+ private final MetricScope scope;
+ private final boolean evaluation;
+ private final Map<String, List<MetricSample>> beforeMetrics;
+ private final Map<String, List<MetricSample>> afterMetrics;
+ private final long actionTimeSeconds;
+ private final long rangeSeconds;
+ private final String evaluatorName;
+
+ /**
+ * Create an immutable evaluation result snapshot.
+ *
+ * @param scope scope of the evaluated metrics
+ * @param evaluation evaluation outcome from the evaluator
+ * @param beforeMetrics metrics collected before the action timestamp
+ * @param afterMetrics metrics collected at/after the action timestamp
+ * @param actionTimeSeconds action timestamp in epoch seconds
+ * @param rangeSeconds evaluation half-window in seconds
+ * @param evaluatorName evaluator implementation name
+ */
+ public EvaluationResult(
+ MetricScope scope,
+ boolean evaluation,
+ Map<String, List<MetricSample>> beforeMetrics,
+ Map<String, List<MetricSample>> afterMetrics,
+ long actionTimeSeconds,
+ long rangeSeconds,
+ String evaluatorName) {
+ Preconditions.checkArgument(scope != null, "scope must not be null");
+ Preconditions.checkArgument(evaluatorName != null, "evaluatorName must not
be null");
+ this.scope = scope;
+ this.evaluation = evaluation;
+ this.beforeMetrics = immutableMetrics(beforeMetrics);
+ this.afterMetrics = immutableMetrics(afterMetrics);
+ this.actionTimeSeconds = actionTimeSeconds;
+ this.rangeSeconds = rangeSeconds;
+ this.evaluatorName = evaluatorName;
+ }
+
+ /**
+ * @return evaluated scope (table/partition/job).
+ */
+ public MetricScope scope() {
+ return scope;
+ }
+
+ /**
+ * @return true if evaluator considers this scope successful.
+ */
+ public boolean evaluation() {
+ return evaluation;
+ }
+
+ /**
+ * @return immutable metrics map for samples before the action timestamp.
+ */
+ public Map<String, List<MetricSample>> beforeMetrics() {
+ return beforeMetrics;
+ }
+
+ /**
+ * @return immutable metrics map for samples at/after the action timestamp.
+ */
+ public Map<String, List<MetricSample>> afterMetrics() {
+ return afterMetrics;
+ }
+
+ /**
+ * @return action timestamp in epoch seconds.
+ */
+ public long actionTimeSeconds() {
+ return actionTimeSeconds;
+ }
+
+ /**
+ * @return evaluation half-window in seconds.
+ */
+ public long rangeSeconds() {
+ return rangeSeconds;
+ }
+
+ /**
+ * @return evaluator implementation name.
+ */
+ public String evaluatorName() {
+ return evaluatorName;
+ }
+
+ private static Map<String, List<MetricSample>> immutableMetrics(
+ Map<String, List<MetricSample>> metrics) {
+ if (metrics == null || metrics.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, List<MetricSample>> copied = new HashMap<>();
+ for (Map.Entry<String, List<MetricSample>> entry : metrics.entrySet()) {
+ List<MetricSample> samples =
+ entry.getValue() == null ? List.of() : new
ArrayList<>(entry.getValue());
+ copied.put(entry.getKey(), Collections.unmodifiableList(samples));
+ }
+ return Collections.unmodifiableMap(copied);
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
new file mode 100644
index 0000000000..8b4e825582
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Represents a provider that provides upstream and downstream jobs for a
table. */
+@DeveloperApi
+public interface JobProvider extends Provider {
+ /**
+ * List jobs related to the provided table.
+ *
+ * @param tableIdentifier catalog/schema/table identifier
+ * @return identifiers for jobs touching this table
+ */
+ List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricScope.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricScope.java
new file mode 100644
index 0000000000..558daf58cd
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricScope.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+
+/** Scope of metrics being evaluated (table, partition, or job). */
+@DeveloperApi
+public final class MetricScope {
+ /** Supported scope kinds for monitor evaluation. */
+ public enum Type {
+ TABLE,
+ PARTITION,
+ JOB
+ }
+
+ private final NameIdentifier identifier;
+ private final Type type;
+ private final Optional<PartitionPath> partition;
+
+ private MetricScope(NameIdentifier identifier, Type type,
Optional<PartitionPath> partition) {
+ Preconditions.checkArgument(identifier != null, "identifier must not be
null");
+ Preconditions.checkArgument(type != null, "type must not be null");
+ Preconditions.checkArgument(partition != null, "partition must not be
null");
+ this.identifier = identifier;
+ this.type = type;
+ this.partition = partition;
+ }
+
+ /**
+ * Create a table scope.
+ *
+ * @param identifier table identifier
+ * @return table metric scope
+ */
+ public static MetricScope forTable(NameIdentifier identifier) {
+ return new MetricScope(identifier, Type.TABLE, Optional.empty());
+ }
+
+ /**
+ * Create a partition scope.
+ *
+ * @param identifier table identifier
+ * @param partition partition path under the table
+ * @return partition metric scope
+ */
+ public static MetricScope forPartition(NameIdentifier identifier,
PartitionPath partition) {
+ return new MetricScope(identifier, Type.PARTITION, Optional.of(partition));
+ }
+
+ /**
+ * Create a job scope.
+ *
+ * @param identifier job identifier
+ * @return job metric scope
+ */
+ public static MetricScope forJob(NameIdentifier identifier) {
+ return new MetricScope(identifier, Type.JOB, Optional.empty());
+ }
+
+ /**
+ * @return evaluated identifier (table or job).
+ */
+ public NameIdentifier identifier() {
+ return identifier;
+ }
+
+ /**
+ * @return scope type.
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * @return partition path when {@link #type()} is {@link Type#PARTITION};
otherwise empty.
+ */
+ public Optional<PartitionPath> partition() {
+ return partition;
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
new file mode 100644
index 0000000000..f383ef6169
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+
+/**
+ * Evaluator interface for the table and related job metrics before and after
optimization actions.
+ */
+@DeveloperApi
+public interface MetricsEvaluator {
+ /** Human-readable evaluator name, primarily for logging and selection. */
+ String name();
+
+ /**
+ * Evaluate metrics before/after optimization to decide success/failure.
+ *
+ * @param scope context of the metrics being evaluated (table/partition/job)
+ * @param beforeMetrics metrics collected before the action timestamp, keyed
by metric name
+ * @param afterMetrics metrics collected at/after the action timestamp,
keyed by metric name
+ * @return true when metrics meet expectations
+ */
+ boolean evaluateMetrics(
+ MetricScope scope,
+ Map<String, List<MetricSample>> beforeMetrics,
+ Map<String, List<MetricSample>> afterMetrics);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
new file mode 100644
index 0000000000..a92cbcd54d
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Represents a provider that provides table and job related metrics. */
+@DeveloperApi
+public interface MetricsProvider extends Provider {
+ /**
+ * Retrieve metrics for a specific job within a time range.
+ *
+ * @param jobIdentifier catalog/schema/job identifier
+ * @param startTime start timestamp (seconds)
+ * @param endTime end timestamp (seconds)
+ * @return map keyed by metric name, each containing a metric sample series
+ */
+ Map<String, List<MetricSample>> jobMetrics(
+ NameIdentifier jobIdentifier, long startTime, long endTime);
+
+ /**
+ * Retrieve metrics for a table.
+ *
+ * @param tableIdentifier catalog/schema/table identifier
+ * @param startTime start timestamp (seconds)
+ * @param endTime end timestamp (seconds)
+ * @return map keyed by metric name, each containing a metric sample series
+ */
+ Map<String, List<MetricSample>> tableMetrics(
+ NameIdentifier tableIdentifier, long startTime, long endTime);
+
+ /**
+ * Retrieve metrics for a specific partition of a table.
+ *
+ * @param tableIdentifier catalog/schema/table identifier
+ * @param partitionPath partition path
+ * @param startTime start timestamp (seconds)
+ * @param endTime end timestamp (seconds)
+ * @return map keyed by metric name, each containing a metric sample series
+ */
+ Map<String, List<MetricSample>> partitionMetrics(
+ NameIdentifier tableIdentifier, PartitionPath partitionPath, long
startTime, long endTime);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
new file mode 100644
index 0000000000..82e11203ce
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Callback SPI invoked after metrics evaluation completes. */
+@DeveloperApi
+public interface MonitorCallback extends Provider {
+
+ /**
+ * Handle a metrics evaluation result.
+ *
+ * @param result evaluation result snapshot
+ */
+ void onEvaluation(EvaluationResult result);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index 8207b3716f..d19eb5a220 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.maintenance.optimizer.common.conf;
+import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
@@ -31,10 +32,9 @@ import
org.apache.gravitino.maintenance.optimizer.recommender.strategy.Gravitino
import
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
/**
- * Central configuration holder for the optimizer/recommender runtime. Keys
are grouped under the
- * {@code gravitino.optimizer.*} prefix and capture both core connectivity
(URI, metalake, default
- * catalog) and pluggable implementation wiring (statistics provider, strategy
provider, table
- * metadata provider, job submitter).
+ * Central configuration holder for the optimizer runtime. Keys are grouped
under the {@code
+ * gravitino.optimizer.*} prefix and capture both core connectivity (URI,
metalake, default catalog)
+ * and pluggable implementation wiring for recommender, updater, and monitor
components.
*/
public class OptimizerConfig extends Config {
@@ -56,6 +56,11 @@ public class OptimizerConfig extends Config {
private static final String UPDATER_PREFIX = OPTIMIZER_PREFIX + "updater.";
private static final String STATISTICS_UPDATER = UPDATER_PREFIX +
"statisticsUpdater";
private static final String METRICS_UPDATER = UPDATER_PREFIX +
"metricsUpdater";
+ private static final String MONITOR_PREFIX = OPTIMIZER_PREFIX + "monitor.";
+ private static final String METRICS_PROVIDER = MONITOR_PREFIX +
"metricsProvider";
+ private static final String JOB_PROVIDER = MONITOR_PREFIX + "jobProvider";
+ private static final String METRICS_EVALUATOR = MONITOR_PREFIX +
"metricsEvaluator";
+ private static final String MONITOR_CALLBACKS = MONITOR_PREFIX + "callbacks";
public static final ConfigEntry<String> STATISTICS_PROVIDER_CONFIG =
new ConfigBuilder(STATISTICS_PROVIDER)
@@ -115,6 +120,45 @@ public class OptimizerConfig extends Config {
.stringConf()
.create();
+ public static final ConfigEntry<String> METRICS_PROVIDER_CONFIG =
+ new ConfigBuilder(METRICS_PROVIDER)
+ .doc(
+ "Monitor metrics provider implementation name (matches
Provider.name()) "
+ + "discoverable via ServiceLoader. Example:
'metrics-provider'.")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<String> JOB_PROVIDER_CONFIG =
+ new ConfigBuilder(JOB_PROVIDER)
+ .doc(
+ "Monitor job provider implementation name (matches
Provider.name()) discoverable "
+ + "via ServiceLoader. Example: 'job-provider'.")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<String> METRICS_EVALUATOR_CONFIG =
+ new ConfigBuilder(METRICS_EVALUATOR)
+ .doc(
+ "Monitor metrics evaluator implementation name discoverable via
ServiceLoader. "
+ + "The evaluator name must match MetricsEvaluator.name().
Example: "
+ + "'metrics-evaluator'.")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<List<String>> MONITOR_CALLBACKS_CONFIG =
+ new ConfigBuilder(MONITOR_CALLBACKS)
+ .doc(
+ "Comma-separated monitor callback implementation names (each
matches "
+ + "Provider.name()) discoverable via ServiceLoader. Example:
"
+ + "'monitor-callback-a,monitor-callback-b'.")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .stringConf()
+ .toSequence()
+ .createWithDefault(List.of());
+
public static final ConfigEntry<String> GRAVITINO_URI_CONFIG =
new ConfigBuilder(GRAVITINO_URI)
.doc("The URI of the Gravitino server.")
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
index 2ccdd294f2..0836143d6f 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
@@ -19,44 +19,57 @@
package org.apache.gravitino.maintenance.optimizer.common.util;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import java.util.List;
import java.util.ServiceLoader;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
public class InstanceLoaderUtils {
public static <T extends StatisticsCalculator> T
createStatisticsCalculatorInstance(
String calculatorName) {
- ServiceLoader<StatisticsCalculator> loader =
ServiceLoader.load(StatisticsCalculator.class);
+ return createInstanceByName(
+ StatisticsCalculator.class, calculatorName,
StatisticsCalculator::name);
+ }
+
+ public static <T extends MetricsEvaluator> T createMetricsEvaluatorInstance(
+ String evaluatorName) {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(evaluatorName), "metrics evaluator name must
not be null or blank");
+ return createInstanceByName(MetricsEvaluator.class, evaluatorName,
MetricsEvaluator::name);
+ }
+
+ private static <B, T extends B> T createInstanceByName(
+ Class<B> typeClass, String instanceName, Function<B, String>
nameFunction) {
+ ServiceLoader<B> loader = ServiceLoader.load(typeClass);
List<Class<? extends T>> providers =
Streams.stream(loader.iterator())
- .filter(p -> p.name().equalsIgnoreCase(calculatorName))
+ .filter(p -> nameFunction.apply(p).equalsIgnoreCase(instanceName))
.map(p -> (Class<? extends T>) p.getClass())
.collect(Collectors.toList());
if (providers.isEmpty()) {
throw new IllegalArgumentException(
- "No "
- + StatisticsCalculator.class.getSimpleName()
- + " class found for: "
- + calculatorName);
+ "No " + typeClass.getSimpleName() + " class found for: " +
instanceName);
} else if (providers.size() > 1) {
throw new IllegalArgumentException(
- "Multiple "
- + StatisticsCalculator.class.getSimpleName()
- + " found for: "
- + calculatorName);
+ "Multiple " + typeClass.getSimpleName() + " found for: " +
instanceName);
} else {
Class<? extends T> providerClz = Iterables.getOnlyElement(providers);
try {
return providerClz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(
- "Failed to instantiate StatisticsCalculator: "
- + calculatorName
+ "Failed to instantiate "
+ + typeClass.getSimpleName()
+ + ": "
+ + instanceName
+ ", class: "
+ providerClz.getName(),
e);
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
index a60b75257d..bd63e389b0 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
@@ -25,6 +25,9 @@ import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
@@ -99,4 +102,16 @@ public class ProviderUtils {
public static MetricsUpdater createMetricsUpdaterInstance(String provider) {
return createProviderInstance(MetricsUpdater.class, provider);
}
+
+ public static MetricsProvider createMetricsProviderInstance(String provider)
{
+ return createProviderInstance(MetricsProvider.class, provider);
+ }
+
+ public static JobProvider createJobProviderInstance(String provider) {
+ return createProviderInstance(JobProvider.class, provider);
+ }
+
+ public static MonitorCallback createMonitorCallbackInstance(String provider)
{
+ return createProviderInstance(MonitorCallback.class, provider);
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
new file mode 100644
index 0000000000..ebf88358f0
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import org.apache.gravitino.maintenance.optimizer.common.CloseableGroup;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.common.util.InstanceLoaderUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entry point that wires monitor providers, evaluator, and callbacks to
assess optimization
+ * outcomes from time-series metrics.
+ *
+ * <p>Purpose:
+ *
+ * <ul>
+ * <li>Fetch table (or partition) metrics in a window around an action
timestamp.
+ * <li>Fetch related job metrics for the same time window.
+ * <li>Split metrics into before/after groups and evaluate them with {@link
MetricsEvaluator}.
+ * <li>Publish each {@link EvaluationResult} to configured {@link
MonitorCallback} instances.
+ * </ul>
+ *
+ * <p>Configuration:
+ *
+ * <ul>
+ * <li>{@link OptimizerConfig#METRICS_PROVIDER_CONFIG} for {@link
MetricsProvider}.
+ * <li>{@link OptimizerConfig#JOB_PROVIDER_CONFIG} for {@link JobProvider}.
+ * <li>{@link OptimizerConfig#METRICS_EVALUATOR_CONFIG} for {@link
MetricsEvaluator}.
+ * <li>{@link OptimizerConfig#MONITOR_CALLBACKS_CONFIG} for callback list.
+ * </ul>
+ *
+ * <p>Lifecycle:
+ *
+ * <ol>
+ * <li>Create a {@link Monitor} with an initialized {@link OptimizerEnv}.
+ * <li>The constructor resolves implementations via ServiceLoader and
initializes them.
+ * <li>Call {@link #evaluateMetrics(NameIdentifier, long, long, Optional)}
when needed.
+ * <li>Consume returned {@link EvaluationResult} list and callback side
effects.
+ * <li>Call {@link #close()} to release provider/callback resources.
+ * </ol>
+ *
+ * <p>Workflow:
+ *
+ * <ol>
+ * <li>Resolve a time range: {@code [actionTimeSeconds - rangeSeconds,
actionTimeSeconds +
+ * rangeSeconds]}.
+ * <li>Read table/partition metrics and evaluate them.
+ * <li>Resolve related jobs from {@link JobProvider} and evaluate each job's
metrics.
+ * <li>Return ordered results (table first, then jobs).
+ * </ol>
+ */
+public class Monitor implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Monitor.class);
+ private final MetricsProvider metricsProvider;
+ private final JobProvider jobProvider;
+ private final MetricsEvaluator metricsEvaluator;
+ private final List<MonitorCallback> callbacks;
+ private final CloseableGroup closeableGroup = new CloseableGroup();
+
+ /**
+ * Create a monitor by loading and initializing monitor providers,
evaluator, and callbacks from
+ * the supplied optimizer environment.
+ *
+ * @param optimizerEnv shared optimizer environment and configuration
+ */
+ public Monitor(OptimizerEnv optimizerEnv) {
+ Preconditions.checkArgument(optimizerEnv != null, "optimizerEnv must not
be null");
+ this.metricsProvider = loadMetricsProvider(optimizerEnv.config());
+ metricsProvider.initialize(optimizerEnv);
+ closeableGroup.register(metricsProvider,
MetricsProvider.class.getSimpleName());
+
+ this.jobProvider = loadJobProvider(optimizerEnv.config());
+ jobProvider.initialize(optimizerEnv);
+ closeableGroup.register(jobProvider, JobProvider.class.getSimpleName());
+
+ this.metricsEvaluator = loadMetricsEvaluator(optimizerEnv.config());
+ this.callbacks = loadCallbacks(optimizerEnv.config());
+ for (MonitorCallback callback : callbacks) {
+ callback.initialize(optimizerEnv);
+ closeableGroup.register(callback, callback.name());
+ }
+ }
+
+ /**
+ * Evaluate table metrics and related job metrics around an action timestamp.
+ *
+ * @param tableIdentifier target table identifier
+ * @param actionTimeSeconds action timestamp in epoch seconds
+ * @param rangeSeconds half-window range in seconds, used to build
[actionTimeSeconds-range,
+ * actionTimeSeconds+range]
+ * @param partitionPath optional partition scope for table metrics
+ * @return evaluation results, including one table/partition result and zero
or more job results
+ */
+ public List<EvaluationResult> evaluateMetrics(
+ NameIdentifier tableIdentifier,
+ long actionTimeSeconds,
+ long rangeSeconds,
+ Optional<PartitionPath> partitionPath) {
+ Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier must
not be null");
+ Preconditions.checkArgument(partitionPath != null, "partitionPath must not
be null");
+ Preconditions.checkArgument(actionTimeSeconds >= 0, "actionTimeSeconds
must be >= 0");
+ Preconditions.checkArgument(rangeSeconds >= 0, "rangeSeconds must be >=
0");
+ try {
+ List<EvaluationResult> results = new ArrayList<>();
+ results.add(
+ evaluateTableMetrics(
+ metricsEvaluator, tableIdentifier, actionTimeSeconds,
rangeSeconds, partitionPath));
+ List<NameIdentifier> jobs = jobProvider.jobIdentifiers(tableIdentifier);
+ if (jobs == null) {
+ jobs = List.of();
+ }
+ for (NameIdentifier jobIdentifier : jobs) {
+ results.add(
+ evaluateJobMetrics(metricsEvaluator, jobIdentifier,
actionTimeSeconds, rangeSeconds));
+ }
+ return results;
+ } catch (RuntimeException e) {
+ if (e instanceof IllegalArgumentException) {
+ throw e;
+ }
+ throw new IllegalStateException(
+ String.format(
+ "Failed to evaluate metrics for table=%s, actionTimeSeconds=%d,
rangeSeconds=%d, partition=%s",
+ tableIdentifier,
+ actionTimeSeconds,
+ rangeSeconds,
+
partitionPath.map(PartitionPath::toString).orElse("<table-scope>")),
+ e);
+ }
+ }
+
+ private EvaluationResult evaluateTableMetrics(
+ MetricsEvaluator evaluator,
+ NameIdentifier tableIdentifier,
+ long actionTimeSeconds,
+ long rangeSeconds,
+ Optional<PartitionPath> partitionPath) {
+ Pair<Long, Long> timeRange = timeRange(actionTimeSeconds, rangeSeconds);
+ Map<String, List<MetricSample>> metrics =
+ partitionPath
+ .map(
+ path ->
+ metricsProvider.partitionMetrics(
+ tableIdentifier, path, timeRange.getLeft(),
timeRange.getRight()))
+ .orElseGet(
+ () ->
+ metricsProvider.tableMetrics(
+ tableIdentifier, timeRange.getLeft(),
timeRange.getRight()));
+
+ Pair<Map<String, List<MetricSample>>, Map<String, List<MetricSample>>>
splitMetrics =
+ splitMetrics(metrics, actionTimeSeconds);
+
+ MetricScope scope =
+ partitionPath
+ .map(path -> MetricScope.forPartition(tableIdentifier, path))
+ .orElseGet(() -> MetricScope.forTable(tableIdentifier));
+ boolean evaluation =
+ evaluator.evaluateMetrics(scope, splitMetrics.getLeft(),
splitMetrics.getRight());
+ EvaluationResult result =
+ new EvaluationResult(
+ scope,
+ evaluation,
+ splitMetrics.getLeft(),
+ splitMetrics.getRight(),
+ actionTimeSeconds,
+ rangeSeconds,
+ evaluator.name());
+ return notifyCallbacks(result);
+ }
+
+ private Pair<Map<String, List<MetricSample>>, Map<String,
List<MetricSample>>> splitMetrics(
+ Map<String, List<MetricSample>> metrics, long actionTimeInSeconds) {
+ // split metrics into metrics before and after action time
+ Map<String, List<MetricSample>> beforeMetrics = new HashMap<>();
+ Map<String, List<MetricSample>> afterMetrics = new HashMap<>();
+ Map<String, List<MetricSample>> source = metrics == null ?
Collections.emptyMap() : metrics;
+ for (Map.Entry<String, List<MetricSample>> entry : source.entrySet()) {
+ String metricName = entry.getKey();
+ List<MetricSample> metricList = entry.getValue() == null ? List.of() :
entry.getValue();
+ beforeMetrics.put(
+ metricName,
+ metricList.stream().filter(m -> m.timestamp() <
actionTimeInSeconds).toList());
+ afterMetrics.put(
+ metricName,
+ metricList.stream().filter(m -> m.timestamp() >=
actionTimeInSeconds).toList());
+ }
+ return Pair.of(beforeMetrics, afterMetrics);
+ }
+
+ private EvaluationResult evaluateJobMetrics(
+ MetricsEvaluator evaluator,
+ NameIdentifier jobIdentifier,
+ long actionTimeSeconds,
+ long rangeSeconds) {
+ Pair<Long, Long> timeRange = timeRange(actionTimeSeconds, rangeSeconds);
+ Map<String, List<MetricSample>> metrics =
+ metricsProvider.jobMetrics(jobIdentifier, timeRange.getLeft(),
timeRange.getRight());
+ Pair<Map<String, List<MetricSample>>, Map<String, List<MetricSample>>>
splitMetrics =
+ splitMetrics(metrics, actionTimeSeconds);
+ MetricScope scope = MetricScope.forJob(jobIdentifier);
+ boolean evaluation =
+ evaluator.evaluateMetrics(scope, splitMetrics.getLeft(),
splitMetrics.getRight());
+ EvaluationResult result =
+ new EvaluationResult(
+ scope,
+ evaluation,
+ splitMetrics.getLeft(),
+ splitMetrics.getRight(),
+ actionTimeSeconds,
+ rangeSeconds,
+ evaluator.name());
+ return notifyCallbacks(result);
+ }
+
+ private Pair<Long, Long> timeRange(long actionTimeSeconds, long
rangeSeconds) {
+ try {
+ long startTime = Math.subtractExact(actionTimeSeconds, rangeSeconds);
+ long endTime = Math.addExact(actionTimeSeconds, rangeSeconds);
+ return Pair.of(startTime, endTime);
+ } catch (ArithmeticException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "time range overflow: actionTimeSeconds=%d, rangeSeconds=%d",
+ actionTimeSeconds, rangeSeconds),
+ e);
+ }
+ }
+
+ private MetricsProvider loadMetricsProvider(OptimizerConfig optimizerConfig)
{
+ return ProviderUtils.createMetricsProviderInstance(
+ optimizerConfig.get(OptimizerConfig.METRICS_PROVIDER_CONFIG));
+ }
+
+ private JobProvider loadJobProvider(OptimizerConfig optimizerConfig) {
+ return ProviderUtils.createJobProviderInstance(
+ optimizerConfig.get(OptimizerConfig.JOB_PROVIDER_CONFIG));
+ }
+
+ private MetricsEvaluator loadMetricsEvaluator(OptimizerConfig
optimizerConfig) {
+ return InstanceLoaderUtils.createMetricsEvaluatorInstance(
+ optimizerConfig.get(OptimizerConfig.METRICS_EVALUATOR_CONFIG));
+ }
+
+ private List<MonitorCallback> loadCallbacks(OptimizerConfig optimizerConfig)
{
+ List<String> callbackNames =
optimizerConfig.get(OptimizerConfig.MONITOR_CALLBACKS_CONFIG);
+ if (callbackNames == null || callbackNames.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<MonitorCallback> result = new ArrayList<>();
+ for (String callbackName : callbackNames) {
+ result.add(ProviderUtils.createMonitorCallbackInstance(callbackName));
+ }
+ return List.copyOf(result);
+ }
+
+ private EvaluationResult notifyCallbacks(EvaluationResult result) {
+ if (callbacks.isEmpty()) {
+ return result;
+ }
+ for (MonitorCallback callback : callbacks) {
+ try {
+ callback.onEvaluation(result);
+ } catch (Exception e) {
+ LOG.warn(
+ "Monitor callback {} failed for scope {}",
+ callback.name(),
+ result.scope().identifier(),
+ e);
+ }
+ }
+ return result;
+ }
+
+ /** Close all initialized monitor providers and callbacks. */
+ @Override
+ public void close() throws Exception {
+ closeableGroup.close();
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestInstanceLoaderUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestInstanceLoaderUtils.java
new file mode 100644
index 0000000000..eede9861f8
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestInstanceLoaderUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
+import
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import
org.apache.gravitino.maintenance.optimizer.updater.StatisticsCalculatorForTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestInstanceLoaderUtils {
+
+ @Test
+ public void testCreateMetricsEvaluatorInstance() {
+ MetricsEvaluator evaluator =
+
InstanceLoaderUtils.createMetricsEvaluatorInstance(MetricsEvaluatorForTest.NAME);
+ Assertions.assertNotNull(evaluator);
+ Assertions.assertTrue(evaluator instanceof MetricsEvaluatorForTest);
+ }
+
+ @Test
+ public void testCreateMetricsEvaluatorInstanceWithNullName() {
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> InstanceLoaderUtils.createMetricsEvaluatorInstance(null));
+ Assertions.assertEquals(
+ "metrics evaluator name must not be null or blank",
exception.getMessage());
+ }
+
+ @Test
+ public void testCreateMetricsEvaluatorInstanceWithBlankName() {
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> InstanceLoaderUtils.createMetricsEvaluatorInstance(" "));
+ Assertions.assertEquals(
+ "metrics evaluator name must not be null or blank",
exception.getMessage());
+ }
+
+ @Test
+ public void testCreateStatisticsCalculatorInstance() {
+ StatisticsCalculator calculator =
+
InstanceLoaderUtils.createStatisticsCalculatorInstance(StatisticsCalculatorForTest.NAME);
+ Assertions.assertNotNull(calculator);
+ Assertions.assertTrue(calculator instanceof StatisticsCalculatorForTest);
+ }
+
+ @Test
+ public void testCreateStatisticsCalculatorInstanceWithUnknownName() {
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
InstanceLoaderUtils.createStatisticsCalculatorInstance("unknown-calculator"));
+ Assertions.assertEquals(
+ "No StatisticsCalculator class found for: unknown-calculator",
exception.getMessage());
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
index bc90dd9e35..9801b9f727 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
@@ -23,6 +23,9 @@ import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
import
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
@@ -68,4 +71,17 @@ public class TestProviderUtils {
Assertions.assertNotNull(tableMetadataProvider);
Assertions.assertTrue(tableMetadataProvider instanceof
GravitinoTableMetadataProvider);
}
+
+ @Test
+ public void testCreateMonitorProviders() {
+ Assertions.assertTrue(
+
ProviderUtils.createMetricsProviderInstance(MetricsProviderForTest.NAME)
+ instanceof MetricsProviderForTest);
+ Assertions.assertTrue(
+ ProviderUtils.createJobProviderInstance(JobProviderForTest.NAME)
+ instanceof JobProviderForTest);
+ Assertions.assertTrue(
+
ProviderUtils.createMonitorCallbackInstance(MonitorCallbackForTest.NAME)
+ instanceof MonitorCallbackForTest);
+ }
}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
new file mode 100644
index 0000000000..5185edf7ce
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMonitor {
+
+ @Test
+ public void testEvaluateMetrics() throws Exception {
+ OptimizerConfig config =
+ new OptimizerConfig(
+ ImmutableMap.<String, String>builder()
+ .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(),
MetricsProviderForTest.NAME)
+ .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(),
JobProviderForTest.NAME)
+ .put(
+ OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
MetricsEvaluatorForTest.NAME)
+ .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(),
MonitorCallbackForTest.NAME)
+ .build());
+
+ OptimizerEnv env = new OptimizerEnv(config);
+
+ MonitorCallbackForTest.reset();
+ MetricsEvaluatorForTest.reset();
+ MetricsEvaluatorForTest.failJob2(true);
+ MetricsProviderForTest.reset();
+
+ NameIdentifier tableIdentifier = NameIdentifier.parse("test.db.table");
+ List<EvaluationResult> results;
+ try (Monitor monitor = new Monitor(env)) {
+ results = monitor.evaluateMetrics(tableIdentifier, 100L, 10L,
Optional.empty());
+ }
+
+ Assertions.assertEquals(3, results.size());
+ Assertions.assertEquals(3, MetricsEvaluatorForTest.INVOCATIONS.get());
+ Assertions.assertEquals(3, MonitorCallbackForTest.INVOCATIONS.get());
+ Assertions.assertEquals(3, MonitorCallbackForTest.RESULTS.size());
+
+ EvaluationResult tableResult = results.get(0);
+ EvaluationResult jobResult1 = results.get(1);
+ EvaluationResult jobResult2 = results.get(2);
+
+ Assertions.assertEquals(MetricScope.Type.TABLE,
tableResult.scope().type());
+ Assertions.assertEquals(tableIdentifier, tableResult.scope().identifier());
+ Assertions.assertTrue(tableResult.evaluation());
+ Assertions.assertEquals(100L, tableResult.actionTimeSeconds());
+ Assertions.assertEquals(10L, tableResult.rangeSeconds());
+ Assertions.assertEquals(MetricsEvaluatorForTest.NAME,
tableResult.evaluatorName());
+ Assertions.assertEquals(1,
tableResult.beforeMetrics().get("row_count").size());
+ Assertions.assertEquals(1,
tableResult.afterMetrics().get("row_count").size());
+ Assertions.assertEquals(95L,
tableResult.beforeMetrics().get("row_count").get(0).timestamp());
+ Assertions.assertEquals(100L,
tableResult.afterMetrics().get("row_count").get(0).timestamp());
+ Assertions.assertEquals(
+ 100L,
+ ((Number)
tableResult.beforeMetrics().get("row_count").get(0).statistic().value().value())
+ .longValue());
+ Assertions.assertEquals(
+ 200L,
+ ((Number)
tableResult.afterMetrics().get("row_count").get(0).statistic().value().value())
+ .longValue());
+
+ Assertions.assertEquals(MetricScope.Type.JOB, jobResult1.scope().type());
+ Assertions.assertEquals(JobProviderForTest.JOB1,
jobResult1.scope().identifier());
+ Assertions.assertTrue(jobResult1.evaluation());
+ Assertions.assertEquals(99L,
jobResult1.beforeMetrics().get("duration").get(0).timestamp());
+ Assertions.assertEquals(102L,
jobResult1.afterMetrics().get("duration").get(0).timestamp());
+ Assertions.assertEquals(
+ 10L,
+ ((Number)
jobResult1.beforeMetrics().get("duration").get(0).statistic().value().value())
+ .longValue());
+ Assertions.assertEquals(
+ 20L,
+ ((Number)
jobResult1.afterMetrics().get("duration").get(0).statistic().value().value())
+ .longValue());
+
+ Assertions.assertEquals(MetricScope.Type.JOB, jobResult2.scope().type());
+ Assertions.assertEquals(JobProviderForTest.JOB2,
jobResult2.scope().identifier());
+ Assertions.assertFalse(jobResult2.evaluation());
+ Assertions.assertEquals(98L,
jobResult2.beforeMetrics().get("duration").get(0).timestamp());
+ Assertions.assertEquals(104L,
jobResult2.afterMetrics().get("duration").get(0).timestamp());
+ Assertions.assertEquals(
+ 30L,
+ ((Number)
jobResult2.beforeMetrics().get("duration").get(0).statistic().value().value())
+ .longValue());
+ Assertions.assertEquals(
+ 40L,
+ ((Number)
jobResult2.afterMetrics().get("duration").get(0).statistic().value().value())
+ .longValue());
+ }
+
+ @Test
+ public void testEvaluateMetricsForPartition() throws Exception {
+ OptimizerConfig config =
+ new OptimizerConfig(
+ ImmutableMap.<String, String>builder()
+ .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(),
MetricsProviderForTest.NAME)
+ .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(),
JobProviderForTest.NAME)
+ .put(
+ OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
MetricsEvaluatorForTest.NAME)
+ .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(),
MonitorCallbackForTest.NAME)
+ .build());
+
+ OptimizerEnv env = new OptimizerEnv(config);
+
+ MonitorCallbackForTest.reset();
+ MetricsEvaluatorForTest.reset();
+ MetricsEvaluatorForTest.failJob2(false);
+ MetricsProviderForTest.reset();
+
+ NameIdentifier tableIdentifier = NameIdentifier.parse("test.db.table");
+ PartitionPath partitionPath =
+ PartitionPath.of(List.of(new PartitionEntryImpl("dt", "2026-02-12")));
+ List<EvaluationResult> results;
+ try (Monitor monitor = new Monitor(env)) {
+ results = monitor.evaluateMetrics(tableIdentifier, 100L, 10L,
Optional.of(partitionPath));
+ }
+
+ Assertions.assertEquals(1,
MetricsProviderForTest.PARTITION_METRICS_CALLS.get());
+ Assertions.assertEquals(partitionPath,
MetricsProviderForTest.LAST_PARTITION_PATH);
+ Assertions.assertEquals(3, results.size());
+
+ EvaluationResult tableResult = results.get(0);
+ Assertions.assertEquals(MetricScope.Type.PARTITION,
tableResult.scope().type());
+ Assertions.assertEquals(partitionPath,
tableResult.scope().partition().orElseThrow());
+ Assertions.assertEquals(97L,
tableResult.beforeMetrics().get("row_count").get(0).timestamp());
+ Assertions.assertEquals(101L,
tableResult.afterMetrics().get("row_count").get(0).timestamp());
+ Assertions.assertEquals(
+ 110L,
+ ((Number)
tableResult.beforeMetrics().get("row_count").get(0).statistic().value().value())
+ .longValue());
+ Assertions.assertEquals(
+ 210L,
+ ((Number)
tableResult.afterMetrics().get("row_count").get(0).statistic().value().value())
+ .longValue());
+ }
+
+ @Test
+ public void testEvaluateMetricsOverflowRange() throws Exception {
+ OptimizerConfig config =
+ new OptimizerConfig(
+ ImmutableMap.<String, String>builder()
+ .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(),
MetricsProviderForTest.NAME)
+ .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(),
JobProviderForTest.NAME)
+ .put(
+ OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
MetricsEvaluatorForTest.NAME)
+ .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(),
MonitorCallbackForTest.NAME)
+ .build());
+ OptimizerEnv env = new OptimizerEnv(config);
+
+ try (Monitor monitor = new Monitor(env)) {
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ monitor.evaluateMetrics(
+ NameIdentifier.parse("test.db.table"), Long.MAX_VALUE,
1L, Optional.empty()));
+ Assertions.assertTrue(exception.getMessage().contains("time range
overflow"));
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/MonitorCallbackForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/MonitorCallbackForTest.java
new file mode 100644
index 0000000000..596c7d1f91
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/MonitorCallbackForTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.callback;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class MonitorCallbackForTest implements MonitorCallback {
+
+ public static final String NAME = "monitor-callback-for-test";
+ public static final AtomicInteger INVOCATIONS = new AtomicInteger();
+ public static final CopyOnWriteArrayList<EvaluationResult> RESULTS = new
CopyOnWriteArrayList<>();
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public void onEvaluation(EvaluationResult result) {
+ INVOCATIONS.incrementAndGet();
+ RESULTS.add(result);
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ public static void reset() {
+ INVOCATIONS.set(0);
+ RESULTS.clear();
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
new file mode 100644
index 0000000000..9d9a994d04
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.evaluator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+
+public class MetricsEvaluatorForTest implements MetricsEvaluator {
+
+ public static final String NAME = "test-metrics-evaluator";
+ public static final AtomicInteger INVOCATIONS = new AtomicInteger();
+ private static volatile boolean FAIL_JOB2 = false;
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public boolean evaluateMetrics(
+ MetricScope scope,
+ Map<String, List<MetricSample>> beforeMetrics,
+ Map<String, List<MetricSample>> afterMetrics) {
+ INVOCATIONS.incrementAndGet();
+ if (FAIL_JOB2
+ && scope.type() == MetricScope.Type.JOB
+ && JobProviderForTest.JOB2.equals(scope.identifier())) {
+ return false;
+ }
+ return true;
+ }
+
+ public static void reset() {
+ INVOCATIONS.set(0);
+ FAIL_JOB2 = false;
+ }
+
+ public static void failJob2(boolean failJob2) {
+ FAIL_JOB2 = failJob2;
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
new file mode 100644
index 0000000000..5ab2cebd61
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class JobProviderForTest implements JobProvider {
+
+ public static final String NAME = "job-provider-for-test";
+ public static final NameIdentifier JOB1 =
NameIdentifier.parse("test.db.job1");
+ public static final NameIdentifier JOB2 =
NameIdentifier.parse("test.db.job2");
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
+ return List.of(JOB1, JOB2);
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
new file mode 100644
index 0000000000..3f371c24c1
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.metrics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class MetricsProviderForTest implements MetricsProvider {
+
+ public static final String NAME = "metrics-provider-for-test";
+ public static final AtomicInteger PARTITION_METRICS_CALLS = new
AtomicInteger();
+ public static volatile PartitionPath LAST_PARTITION_PATH = null;
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public Map<String, List<MetricSample>> jobMetrics(
+ NameIdentifier jobIdentifier, long startTime, long endTime) {
+ if (JobProviderForTest.JOB1.equals(jobIdentifier)) {
+ return Map.of(
+ "duration",
+ List.of(
+ metric(99, "duration", StatisticValues.longValue(10L)),
+ metric(102, "duration", StatisticValues.longValue(20L))));
+ }
+ if (JobProviderForTest.JOB2.equals(jobIdentifier)) {
+ return Map.of(
+ "duration",
+ List.of(
+ metric(98, "duration", StatisticValues.longValue(30L)),
+ metric(104, "duration", StatisticValues.longValue(40L))));
+ }
+ return Map.of();
+ }
+
+ @Override
+ public Map<String, List<MetricSample>> tableMetrics(
+ NameIdentifier tableIdentifier, long startTime, long endTime) {
+ return Map.of(
+ "row_count",
+ List.of(
+ metric(95, "row_count", StatisticValues.longValue(100L)),
+ metric(100, "row_count", StatisticValues.longValue(200L))));
+ }
+
+ @Override
+ public Map<String, List<MetricSample>> partitionMetrics(
+ NameIdentifier tableIdentifier, PartitionPath partitionPath, long
startTime, long endTime) {
+ PARTITION_METRICS_CALLS.incrementAndGet();
+ LAST_PARTITION_PATH = partitionPath;
+ return Map.of(
+ "row_count",
+ List.of(
+ metric(97, "row_count", StatisticValues.longValue(110L)),
+ metric(101, "row_count", StatisticValues.longValue(210L))));
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ public static void reset() {
+ PARTITION_METRICS_CALLS.set(0);
+ LAST_PARTITION_PATH = null;
+ }
+
+ private static <T> MetricSample metric(
+ long timestamp, String metricName, StatisticValue<T> statisticValue) {
+ return new MetricSample() {
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public StatisticEntry<?> statistic() {
+ return new StatisticEntry<T>() {
+ @Override
+ public String name() {
+ return metricName;
+ }
+
+ @Override
+ public StatisticValue<T> value() {
+ return statisticValue;
+ }
+ };
+ }
+ };
+ }
+}
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 1948a23487..4cc9447c73 100644
---
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -19,3 +19,6 @@
org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
+org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest
+org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest
+org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
similarity index 84%
copy from
maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
copy to
maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
index 1948a23487..1e8cd59e16 100644
---
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
@@ -17,5 +17,4 @@
# under the License.
#
-org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
+org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest