This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 252475bc5e [#9983] feat(optimizer): support monitor code skeleton 
(#10001)
252475bc5e is described below

commit 252475bc5e7a5d96c4ce5e31454ae099b3bd216f
Author: FANNG <[email protected]>
AuthorDate: Wed Feb 25 12:06:22 2026 +0900

    [#9983] feat(optimizer): support monitor code skeleton (#10001)
    
    ## What changes were proposed in this pull request?
    
    - Add monitor skeleton APIs under `api/monitor`, including
    evaluator/provider/callback
      contracts and evaluation result model.
    - Add `Monitor` orchestration skeleton to evaluate table and related job
    metrics around an
      action timestamp.
    - Add monitor-related config keys and SPI loaders in `OptimizerConfig`,
    `ProviderUtils`,
      and `InstanceLoaderUtils`.
    - Add unit tests and test SPI registrations for monitor
    provider/evaluator/callback
      loading and monitor execution flow.
    - Keep scope focused on skeleton only: no `OptimizerCmd` monitor
    CLI/service changes and
      no monitor service implementation classes.
    
      ## Why are the changes needed?
    
    Fixes: #9983
    
    - This PR introduces the monitor abstraction layer and minimal execution
    skeleton so
      follow-up PRs can add concrete monitor implementations incrementally.
    
      ## Does this PR introduce any user-facing change?
    
      - No.
    
      ## How was this patch tested?
    
      - `./gradlew :maintenance:optimizer:spotlessApply`
      - `./gradlew :maintenance:optimizer:test -PskipITs`
    
    Co-authored-by: Qi Yu <[email protected]>
---
 .../optimizer/api/monitor/EvaluationResult.java    | 136 +++++++++
 .../optimizer/api/monitor/JobProvider.java         |  37 +++
 .../optimizer/api/monitor/MetricScope.java         | 102 +++++++
 .../optimizer/api/monitor/MetricsEvaluator.java    |  47 +++
 .../optimizer/api/monitor/MetricsProvider.java     |  66 +++++
 .../optimizer/api/monitor/MonitorCallback.java     |  35 +++
 .../optimizer/common/conf/OptimizerConfig.java     |  52 +++-
 .../optimizer/common/util/InstanceLoaderUtils.java |  37 ++-
 .../optimizer/common/util/ProviderUtils.java       |  15 +
 .../maintenance/optimizer/monitor/Monitor.java     | 317 +++++++++++++++++++++
 .../common/util/TestInstanceLoaderUtils.java       |  76 +++++
 .../optimizer/common/util/TestProviderUtils.java   |  16 ++
 .../maintenance/optimizer/monitor/TestMonitor.java | 192 +++++++++++++
 .../monitor/callback/MonitorCallbackForTest.java   |  55 ++++
 .../monitor/evaluator/MetricsEvaluatorForTest.java |  63 ++++
 .../optimizer/monitor/job/JobProviderForTest.java  |  48 ++++
 .../monitor/metrics/MetricsProviderForTest.java    | 123 ++++++++
 ...itino.maintenance.optimizer.api.common.Provider |   3 +
 ...tenance.optimizer.api.monitor.MetricsEvaluator} |   3 +-
 19 files changed, 1405 insertions(+), 18 deletions(-)

diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
new file mode 100644
index 0000000000..2f3f2acdd4
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+
+/** Immutable evaluation result passed to monitor callbacks. */
+@DeveloperApi
+public class EvaluationResult {
+
+  private final MetricScope scope;
+  private final boolean evaluation;
+  private final Map<String, List<MetricSample>> beforeMetrics;
+  private final Map<String, List<MetricSample>> afterMetrics;
+  private final long actionTimeSeconds;
+  private final long rangeSeconds;
+  private final String evaluatorName;
+
+  /**
+   * Create an immutable evaluation result snapshot.
+   *
+   * @param scope scope of the evaluated metrics
+   * @param evaluation evaluation outcome from the evaluator
+   * @param beforeMetrics metrics collected before the action timestamp
+   * @param afterMetrics metrics collected at/after the action timestamp
+   * @param actionTimeSeconds action timestamp in epoch seconds
+   * @param rangeSeconds evaluation half-window in seconds
+   * @param evaluatorName evaluator implementation name
+   */
+  public EvaluationResult(
+      MetricScope scope,
+      boolean evaluation,
+      Map<String, List<MetricSample>> beforeMetrics,
+      Map<String, List<MetricSample>> afterMetrics,
+      long actionTimeSeconds,
+      long rangeSeconds,
+      String evaluatorName) {
+    Preconditions.checkArgument(scope != null, "scope must not be null");
+    Preconditions.checkArgument(evaluatorName != null, "evaluatorName must not 
be null");
+    this.scope = scope;
+    this.evaluation = evaluation;
+    this.beforeMetrics = immutableMetrics(beforeMetrics);
+    this.afterMetrics = immutableMetrics(afterMetrics);
+    this.actionTimeSeconds = actionTimeSeconds;
+    this.rangeSeconds = rangeSeconds;
+    this.evaluatorName = evaluatorName;
+  }
+
+  /**
+   * @return evaluated scope (table/partition/job).
+   */
+  public MetricScope scope() {
+    return scope;
+  }
+
+  /**
+   * @return true if evaluator considers this scope successful.
+   */
+  public boolean evaluation() {
+    return evaluation;
+  }
+
+  /**
+   * @return immutable metrics map for samples before the action timestamp.
+   */
+  public Map<String, List<MetricSample>> beforeMetrics() {
+    return beforeMetrics;
+  }
+
+  /**
+   * @return immutable metrics map for samples at/after the action timestamp.
+   */
+  public Map<String, List<MetricSample>> afterMetrics() {
+    return afterMetrics;
+  }
+
+  /**
+   * @return action timestamp in epoch seconds.
+   */
+  public long actionTimeSeconds() {
+    return actionTimeSeconds;
+  }
+
+  /**
+   * @return evaluation half-window in seconds.
+   */
+  public long rangeSeconds() {
+    return rangeSeconds;
+  }
+
+  /**
+   * @return evaluator implementation name.
+   */
+  public String evaluatorName() {
+    return evaluatorName;
+  }
+
+  private static Map<String, List<MetricSample>> immutableMetrics(
+      Map<String, List<MetricSample>> metrics) {
+    if (metrics == null || metrics.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<String, List<MetricSample>> copied = new HashMap<>();
+    for (Map.Entry<String, List<MetricSample>> entry : metrics.entrySet()) {
+      List<MetricSample> samples =
+          entry.getValue() == null ? List.of() : new 
ArrayList<>(entry.getValue());
+      copied.put(entry.getKey(), Collections.unmodifiableList(samples));
+    }
+    return Collections.unmodifiableMap(copied);
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
new file mode 100644
index 0000000000..8b4e825582
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Represents a provider that provides upstream and downstream jobs for a 
table. */
+@DeveloperApi
+public interface JobProvider extends Provider {
+  /**
+   * List jobs related to the provided table.
+   *
+   * @param tableIdentifier catalog/schema/table identifier
+   * @return identifiers for jobs touching this table
+   */
+  List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricScope.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricScope.java
new file mode 100644
index 0000000000..558daf58cd
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricScope.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+
+/** Scope of metrics being evaluated (table, partition, or job). */
+@DeveloperApi
+public final class MetricScope {
+  /** Supported scope kinds for monitor evaluation. */
+  public enum Type {
+    TABLE,
+    PARTITION,
+    JOB
+  }
+
+  private final NameIdentifier identifier;
+  private final Type type;
+  private final Optional<PartitionPath> partition;
+
+  private MetricScope(NameIdentifier identifier, Type type, 
Optional<PartitionPath> partition) {
+    Preconditions.checkArgument(identifier != null, "identifier must not be 
null");
+    Preconditions.checkArgument(type != null, "type must not be null");
+    Preconditions.checkArgument(partition != null, "partition must not be 
null");
+    this.identifier = identifier;
+    this.type = type;
+    this.partition = partition;
+  }
+
+  /**
+   * Create a table scope.
+   *
+   * @param identifier table identifier
+   * @return table metric scope
+   */
+  public static MetricScope forTable(NameIdentifier identifier) {
+    return new MetricScope(identifier, Type.TABLE, Optional.empty());
+  }
+
+  /**
+   * Create a partition scope.
+   *
+   * @param identifier table identifier
+   * @param partition partition path under the table
+   * @return partition metric scope
+   */
+  public static MetricScope forPartition(NameIdentifier identifier, 
PartitionPath partition) {
+    return new MetricScope(identifier, Type.PARTITION, Optional.of(partition));
+  }
+
+  /**
+   * Create a job scope.
+   *
+   * @param identifier job identifier
+   * @return job metric scope
+   */
+  public static MetricScope forJob(NameIdentifier identifier) {
+    return new MetricScope(identifier, Type.JOB, Optional.empty());
+  }
+
+  /**
+   * @return evaluated identifier (table or job).
+   */
+  public NameIdentifier identifier() {
+    return identifier;
+  }
+
+  /**
+   * @return scope type.
+   */
+  public Type type() {
+    return type;
+  }
+
+  /**
+   * @return partition path when {@link #type()} is {@link Type#PARTITION}; 
otherwise empty.
+   */
+  public Optional<PartitionPath> partition() {
+    return partition;
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
new file mode 100644
index 0000000000..f383ef6169
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+
+/**
+ * Evaluator interface for the table and related job metrics before and after 
optimization actions.
+ */
+@DeveloperApi
+public interface MetricsEvaluator {
+  /** Human-readable evaluator name, primarily for logging and selection. */
+  String name();
+
+  /**
+   * Evaluate metrics before/after optimization to decide success/failure.
+   *
+   * @param scope context of the metrics being evaluated (table/partition/job)
+   * @param beforeMetrics metrics collected before the action timestamp, keyed 
by metric name
+   * @param afterMetrics metrics collected at/after the action timestamp, 
keyed by metric name
+   * @return true when metrics meet expectations
+   */
+  boolean evaluateMetrics(
+      MetricScope scope,
+      Map<String, List<MetricSample>> beforeMetrics,
+      Map<String, List<MetricSample>> afterMetrics);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
new file mode 100644
index 0000000000..a92cbcd54d
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Represents a provider that provides table and job related metrics. */
+@DeveloperApi
+public interface MetricsProvider extends Provider {
+  /**
+   * Retrieve metrics for a specific job within a time range.
+   *
+   * @param jobIdentifier catalog/schema/job identifier
+   * @param startTime start timestamp (seconds)
+   * @param endTime end timestamp (seconds)
+   * @return map keyed by metric name, each containing a metric sample series
+   */
+  Map<String, List<MetricSample>> jobMetrics(
+      NameIdentifier jobIdentifier, long startTime, long endTime);
+
+  /**
+   * Retrieve metrics for a table.
+   *
+   * @param tableIdentifier catalog/schema/table identifier
+   * @param startTime start timestamp (seconds)
+   * @param endTime end timestamp (seconds)
+   * @return map keyed by metric name, each containing a metric sample series
+   */
+  Map<String, List<MetricSample>> tableMetrics(
+      NameIdentifier tableIdentifier, long startTime, long endTime);
+
+  /**
+   * Retrieve metrics for a specific partition of a table.
+   *
+   * @param tableIdentifier catalog/schema/table identifier
+   * @param partitionPath partition path
+   * @param startTime start timestamp (seconds)
+   * @param endTime end timestamp (seconds)
+   * @return map keyed by metric name, each containing a metric sample series
+   */
+  Map<String, List<MetricSample>> partitionMetrics(
+      NameIdentifier tableIdentifier, PartitionPath partitionPath, long 
startTime, long endTime);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
new file mode 100644
index 0000000000..82e11203ce
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.monitor;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Callback SPI invoked after metrics evaluation completes. */
+@DeveloperApi
+public interface MonitorCallback extends Provider {
+
+  /**
+   * Handle a metrics evaluation result.
+   *
+   * @param result evaluation result snapshot
+   */
+  void onEvaluation(EvaluationResult result);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index 8207b3716f..d19eb5a220 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.maintenance.optimizer.common.conf;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
@@ -31,10 +32,9 @@ import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.Gravitino
 import 
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
 
 /**
- * Central configuration holder for the optimizer/recommender runtime. Keys 
are grouped under the
- * {@code gravitino.optimizer.*} prefix and capture both core connectivity 
(URI, metalake, default
- * catalog) and pluggable implementation wiring (statistics provider, strategy 
provider, table
- * metadata provider, job submitter).
+ * Central configuration holder for the optimizer runtime. Keys are grouped 
under the {@code
+ * gravitino.optimizer.*} prefix and capture both core connectivity (URI, 
metalake, default catalog)
+ * and pluggable implementation wiring for recommender, updater, and monitor 
components.
  */
 public class OptimizerConfig extends Config {
 
@@ -56,6 +56,11 @@ public class OptimizerConfig extends Config {
   private static final String UPDATER_PREFIX = OPTIMIZER_PREFIX + "updater.";
   private static final String STATISTICS_UPDATER = UPDATER_PREFIX + 
"statisticsUpdater";
   private static final String METRICS_UPDATER = UPDATER_PREFIX + 
"metricsUpdater";
+  private static final String MONITOR_PREFIX = OPTIMIZER_PREFIX + "monitor.";
+  private static final String METRICS_PROVIDER = MONITOR_PREFIX + 
"metricsProvider";
+  private static final String JOB_PROVIDER = MONITOR_PREFIX + "jobProvider";
+  private static final String METRICS_EVALUATOR = MONITOR_PREFIX + 
"metricsEvaluator";
+  private static final String MONITOR_CALLBACKS = MONITOR_PREFIX + "callbacks";
 
   public static final ConfigEntry<String> STATISTICS_PROVIDER_CONFIG =
       new ConfigBuilder(STATISTICS_PROVIDER)
@@ -115,6 +120,45 @@ public class OptimizerConfig extends Config {
           .stringConf()
           .create();
 
+  public static final ConfigEntry<String> METRICS_PROVIDER_CONFIG =
+      new ConfigBuilder(METRICS_PROVIDER)
+          .doc(
+              "Monitor metrics provider implementation name (matches 
Provider.name()) "
+                  + "discoverable via ServiceLoader. Example: 
'metrics-provider'.")
+          .version(ConfigConstants.VERSION_1_2_0)
+          .stringConf()
+          .create();
+
+  public static final ConfigEntry<String> JOB_PROVIDER_CONFIG =
+      new ConfigBuilder(JOB_PROVIDER)
+          .doc(
+              "Monitor job provider implementation name (matches 
Provider.name()) discoverable "
+                  + "via ServiceLoader. Example: 'job-provider'.")
+          .version(ConfigConstants.VERSION_1_2_0)
+          .stringConf()
+          .create();
+
+  public static final ConfigEntry<String> METRICS_EVALUATOR_CONFIG =
+      new ConfigBuilder(METRICS_EVALUATOR)
+          .doc(
+              "Monitor metrics evaluator implementation name discoverable via 
ServiceLoader. "
+                  + "The evaluator name must match MetricsEvaluator.name(). 
Example: "
+                  + "'metrics-evaluator'.")
+          .version(ConfigConstants.VERSION_1_2_0)
+          .stringConf()
+          .create();
+
+  public static final ConfigEntry<List<String>> MONITOR_CALLBACKS_CONFIG =
+      new ConfigBuilder(MONITOR_CALLBACKS)
+          .doc(
+              "Comma-separated monitor callback implementation names (each 
matches "
+                  + "Provider.name()) discoverable via ServiceLoader. Example: 
"
+                  + "'monitor-callback-a,monitor-callback-b'.")
+          .version(ConfigConstants.VERSION_1_2_0)
+          .stringConf()
+          .toSequence()
+          .createWithDefault(List.of());
+
   public static final ConfigEntry<String> GRAVITINO_URI_CONFIG =
       new ConfigBuilder(GRAVITINO_URI)
           .doc("The URI of the Gravitino server.")
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
index 2ccdd294f2..0836143d6f 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
@@ -19,44 +19,57 @@
 
 package org.apache.gravitino.maintenance.optimizer.common.util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Streams;
 import java.util.List;
 import java.util.ServiceLoader;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
 import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
 
 public class InstanceLoaderUtils {
 
   public static <T extends StatisticsCalculator> T 
createStatisticsCalculatorInstance(
       String calculatorName) {
-    ServiceLoader<StatisticsCalculator> loader = 
ServiceLoader.load(StatisticsCalculator.class);
+    return createInstanceByName(
+        StatisticsCalculator.class, calculatorName, 
StatisticsCalculator::name);
+  }
+
+  public static <T extends MetricsEvaluator> T createMetricsEvaluatorInstance(
+      String evaluatorName) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(evaluatorName), "metrics evaluator name must 
not be null or blank");
+    return createInstanceByName(MetricsEvaluator.class, evaluatorName, 
MetricsEvaluator::name);
+  }
+
+  private static <B, T extends B> T createInstanceByName(
+      Class<B> typeClass, String instanceName, Function<B, String> 
nameFunction) {
+    ServiceLoader<B> loader = ServiceLoader.load(typeClass);
     List<Class<? extends T>> providers =
         Streams.stream(loader.iterator())
-            .filter(p -> p.name().equalsIgnoreCase(calculatorName))
+            .filter(p -> nameFunction.apply(p).equalsIgnoreCase(instanceName))
             .map(p -> (Class<? extends T>) p.getClass())
             .collect(Collectors.toList());
 
     if (providers.isEmpty()) {
       throw new IllegalArgumentException(
-          "No "
-              + StatisticsCalculator.class.getSimpleName()
-              + " class found for: "
-              + calculatorName);
+          "No " + typeClass.getSimpleName() + " class found for: " + 
instanceName);
     } else if (providers.size() > 1) {
       throw new IllegalArgumentException(
-          "Multiple "
-              + StatisticsCalculator.class.getSimpleName()
-              + " found for: "
-              + calculatorName);
+          "Multiple " + typeClass.getSimpleName() + " found for: " + 
instanceName);
     } else {
       Class<? extends T> providerClz = Iterables.getOnlyElement(providers);
       try {
         return providerClz.getDeclaredConstructor().newInstance();
       } catch (Exception e) {
         throw new RuntimeException(
-            "Failed to instantiate StatisticsCalculator: "
-                + calculatorName
+            "Failed to instantiate "
+                + typeClass.getSimpleName()
+                + ": "
+                + instanceName
                 + ", class: "
                 + providerClz.getName(),
             e);
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
index a60b75257d..bd63e389b0 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
@@ -25,6 +25,9 @@ import java.util.List;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
 import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
@@ -99,4 +102,16 @@ public class ProviderUtils {
   public static MetricsUpdater createMetricsUpdaterInstance(String provider) {
     return createProviderInstance(MetricsUpdater.class, provider);
   }
+
+  public static MetricsProvider createMetricsProviderInstance(String provider) 
{
+    return createProviderInstance(MetricsProvider.class, provider);
+  }
+
+  public static JobProvider createJobProviderInstance(String provider) {
+    return createProviderInstance(JobProvider.class, provider);
+  }
+
+  public static MonitorCallback createMonitorCallbackInstance(String provider) 
{
+    return createProviderInstance(MonitorCallback.class, provider);
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
new file mode 100644
index 0000000000..ebf88358f0
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import org.apache.gravitino.maintenance.optimizer.common.CloseableGroup;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.InstanceLoaderUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entry point that wires monitor providers, evaluator, and callbacks to 
assess optimization
+ * outcomes from time-series metrics.
+ *
+ * <p>Purpose:
+ *
+ * <ul>
+ *   <li>Fetch table (or partition) metrics in a window around an action 
timestamp.
+ *   <li>Fetch related job metrics for the same time window.
+ *   <li>Split metrics into before/after groups and evaluate them with {@link 
MetricsEvaluator}.
+ *   <li>Publish each {@link EvaluationResult} to configured {@link 
MonitorCallback} instances.
+ * </ul>
+ *
+ * <p>Configuration:
+ *
+ * <ul>
+ *   <li>{@link OptimizerConfig#METRICS_PROVIDER_CONFIG} for {@link 
MetricsProvider}.
+ *   <li>{@link OptimizerConfig#JOB_PROVIDER_CONFIG} for {@link JobProvider}.
+ *   <li>{@link OptimizerConfig#METRICS_EVALUATOR_CONFIG} for {@link 
MetricsEvaluator}.
+ *   <li>{@link OptimizerConfig#MONITOR_CALLBACKS_CONFIG} for callback list.
+ * </ul>
+ *
+ * <p>Lifecycle:
+ *
+ * <ol>
+ *   <li>Create a {@link Monitor} with an initialized {@link OptimizerEnv}.
+ *   <li>The constructor resolves implementations via ServiceLoader and 
initializes them.
+ *   <li>Call {@link #evaluateMetrics(NameIdentifier, long, long, Optional)} 
when needed.
+ *   <li>Consume returned {@link EvaluationResult} list and callback side 
effects.
+ *   <li>Call {@link #close()} to release provider/callback resources.
+ * </ol>
+ *
+ * <p>Workflow:
+ *
+ * <ol>
+ *   <li>Resolve a time range: {@code [actionTimeSeconds - rangeSeconds, 
actionTimeSeconds +
+ *       rangeSeconds]}.
+ *   <li>Read table/partition metrics and evaluate them.
+ *   <li>Resolve related jobs from {@link JobProvider} and evaluate each job's 
metrics.
+ *   <li>Return ordered results (table first, then jobs).
+ * </ol>
+ */
+public class Monitor implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Monitor.class);
+  private final MetricsProvider metricsProvider;
+  private final JobProvider jobProvider;
+  private final MetricsEvaluator metricsEvaluator;
+  private final List<MonitorCallback> callbacks;
+  private final CloseableGroup closeableGroup = new CloseableGroup();
+
+  /**
+   * Create a monitor by loading and initializing monitor providers, 
evaluator, and callbacks from
+   * the supplied optimizer environment.
+   *
+   * @param optimizerEnv shared optimizer environment and configuration
+   */
+  public Monitor(OptimizerEnv optimizerEnv) {
+    Preconditions.checkArgument(optimizerEnv != null, "optimizerEnv must not 
be null");
+    this.metricsProvider = loadMetricsProvider(optimizerEnv.config());
+    metricsProvider.initialize(optimizerEnv);
+    closeableGroup.register(metricsProvider, 
MetricsProvider.class.getSimpleName());
+
+    this.jobProvider = loadJobProvider(optimizerEnv.config());
+    jobProvider.initialize(optimizerEnv);
+    closeableGroup.register(jobProvider, JobProvider.class.getSimpleName());
+
+    this.metricsEvaluator = loadMetricsEvaluator(optimizerEnv.config());
+    this.callbacks = loadCallbacks(optimizerEnv.config());
+    for (MonitorCallback callback : callbacks) {
+      callback.initialize(optimizerEnv);
+      closeableGroup.register(callback, callback.name());
+    }
+  }
+
+  /**
+   * Evaluate table metrics and related job metrics around an action timestamp.
+   *
+   * @param tableIdentifier target table identifier
+   * @param actionTimeSeconds action timestamp in epoch seconds
+   * @param rangeSeconds half-window range in seconds, used to build 
[actionTimeSeconds-range,
+   *     actionTimeSeconds+range]
+   * @param partitionPath optional partition scope for table metrics
+   * @return evaluation results, including one table/partition result and zero 
or more job results
+   */
+  public List<EvaluationResult> evaluateMetrics(
+      NameIdentifier tableIdentifier,
+      long actionTimeSeconds,
+      long rangeSeconds,
+      Optional<PartitionPath> partitionPath) {
+    Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier must 
not be null");
+    Preconditions.checkArgument(partitionPath != null, "partitionPath must not 
be null");
+    Preconditions.checkArgument(actionTimeSeconds >= 0, "actionTimeSeconds 
must be >= 0");
+    Preconditions.checkArgument(rangeSeconds >= 0, "rangeSeconds must be >= 
0");
+    try {
+      List<EvaluationResult> results = new ArrayList<>();
+      results.add(
+          evaluateTableMetrics(
+              metricsEvaluator, tableIdentifier, actionTimeSeconds, 
rangeSeconds, partitionPath));
+      List<NameIdentifier> jobs = jobProvider.jobIdentifiers(tableIdentifier);
+      if (jobs == null) {
+        jobs = List.of();
+      }
+      for (NameIdentifier jobIdentifier : jobs) {
+        results.add(
+            evaluateJobMetrics(metricsEvaluator, jobIdentifier, 
actionTimeSeconds, rangeSeconds));
+      }
+      return results;
+    } catch (RuntimeException e) {
+      if (e instanceof IllegalArgumentException) {
+        throw e;
+      }
+      throw new IllegalStateException(
+          String.format(
+              "Failed to evaluate metrics for table=%s, actionTimeSeconds=%d, 
rangeSeconds=%d, partition=%s",
+              tableIdentifier,
+              actionTimeSeconds,
+              rangeSeconds,
+              
partitionPath.map(PartitionPath::toString).orElse("<table-scope>")),
+          e);
+    }
+  }
+
+  private EvaluationResult evaluateTableMetrics(
+      MetricsEvaluator evaluator,
+      NameIdentifier tableIdentifier,
+      long actionTimeSeconds,
+      long rangeSeconds,
+      Optional<PartitionPath> partitionPath) {
+    Pair<Long, Long> timeRange = timeRange(actionTimeSeconds, rangeSeconds);
+    Map<String, List<MetricSample>> metrics =
+        partitionPath
+            .map(
+                path ->
+                    metricsProvider.partitionMetrics(
+                        tableIdentifier, path, timeRange.getLeft(), 
timeRange.getRight()))
+            .orElseGet(
+                () ->
+                    metricsProvider.tableMetrics(
+                        tableIdentifier, timeRange.getLeft(), 
timeRange.getRight()));
+
+    Pair<Map<String, List<MetricSample>>, Map<String, List<MetricSample>>> 
splitMetrics =
+        splitMetrics(metrics, actionTimeSeconds);
+
+    MetricScope scope =
+        partitionPath
+            .map(path -> MetricScope.forPartition(tableIdentifier, path))
+            .orElseGet(() -> MetricScope.forTable(tableIdentifier));
+    boolean evaluation =
+        evaluator.evaluateMetrics(scope, splitMetrics.getLeft(), 
splitMetrics.getRight());
+    EvaluationResult result =
+        new EvaluationResult(
+            scope,
+            evaluation,
+            splitMetrics.getLeft(),
+            splitMetrics.getRight(),
+            actionTimeSeconds,
+            rangeSeconds,
+            evaluator.name());
+    return notifyCallbacks(result);
+  }
+
+  private Pair<Map<String, List<MetricSample>>, Map<String, 
List<MetricSample>>> splitMetrics(
+      Map<String, List<MetricSample>> metrics, long actionTimeInSeconds) {
+    // split metrics into metrics before and after action time
+    Map<String, List<MetricSample>> beforeMetrics = new HashMap<>();
+    Map<String, List<MetricSample>> afterMetrics = new HashMap<>();
+    Map<String, List<MetricSample>> source = metrics == null ? 
Collections.emptyMap() : metrics;
+    for (Map.Entry<String, List<MetricSample>> entry : source.entrySet()) {
+      String metricName = entry.getKey();
+      List<MetricSample> metricList = entry.getValue() == null ? List.of() : 
entry.getValue();
+      beforeMetrics.put(
+          metricName,
+          metricList.stream().filter(m -> m.timestamp() < 
actionTimeInSeconds).toList());
+      afterMetrics.put(
+          metricName,
+          metricList.stream().filter(m -> m.timestamp() >= 
actionTimeInSeconds).toList());
+    }
+    return Pair.of(beforeMetrics, afterMetrics);
+  }
+
+  private EvaluationResult evaluateJobMetrics(
+      MetricsEvaluator evaluator,
+      NameIdentifier jobIdentifier,
+      long actionTimeSeconds,
+      long rangeSeconds) {
+    Pair<Long, Long> timeRange = timeRange(actionTimeSeconds, rangeSeconds);
+    Map<String, List<MetricSample>> metrics =
+        metricsProvider.jobMetrics(jobIdentifier, timeRange.getLeft(), 
timeRange.getRight());
+    Pair<Map<String, List<MetricSample>>, Map<String, List<MetricSample>>> 
splitMetrics =
+        splitMetrics(metrics, actionTimeSeconds);
+    MetricScope scope = MetricScope.forJob(jobIdentifier);
+    boolean evaluation =
+        evaluator.evaluateMetrics(scope, splitMetrics.getLeft(), 
splitMetrics.getRight());
+    EvaluationResult result =
+        new EvaluationResult(
+            scope,
+            evaluation,
+            splitMetrics.getLeft(),
+            splitMetrics.getRight(),
+            actionTimeSeconds,
+            rangeSeconds,
+            evaluator.name());
+    return notifyCallbacks(result);
+  }
+
+  private Pair<Long, Long> timeRange(long actionTimeSeconds, long 
rangeSeconds) {
+    try {
+      long startTime = Math.subtractExact(actionTimeSeconds, rangeSeconds);
+      long endTime = Math.addExact(actionTimeSeconds, rangeSeconds);
+      return Pair.of(startTime, endTime);
+    } catch (ArithmeticException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "time range overflow: actionTimeSeconds=%d, rangeSeconds=%d",
+              actionTimeSeconds, rangeSeconds),
+          e);
+    }
+  }
+
+  private MetricsProvider loadMetricsProvider(OptimizerConfig optimizerConfig) 
{
+    return ProviderUtils.createMetricsProviderInstance(
+        optimizerConfig.get(OptimizerConfig.METRICS_PROVIDER_CONFIG));
+  }
+
+  private JobProvider loadJobProvider(OptimizerConfig optimizerConfig) {
+    return ProviderUtils.createJobProviderInstance(
+        optimizerConfig.get(OptimizerConfig.JOB_PROVIDER_CONFIG));
+  }
+
+  private MetricsEvaluator loadMetricsEvaluator(OptimizerConfig 
optimizerConfig) {
+    return InstanceLoaderUtils.createMetricsEvaluatorInstance(
+        optimizerConfig.get(OptimizerConfig.METRICS_EVALUATOR_CONFIG));
+  }
+
+  private List<MonitorCallback> loadCallbacks(OptimizerConfig optimizerConfig) 
{
+    List<String> callbackNames = 
optimizerConfig.get(OptimizerConfig.MONITOR_CALLBACKS_CONFIG);
+    if (callbackNames == null || callbackNames.isEmpty()) {
+      return Collections.emptyList();
+    }
+    List<MonitorCallback> result = new ArrayList<>();
+    for (String callbackName : callbackNames) {
+      result.add(ProviderUtils.createMonitorCallbackInstance(callbackName));
+    }
+    return List.copyOf(result);
+  }
+
+  private EvaluationResult notifyCallbacks(EvaluationResult result) {
+    if (callbacks.isEmpty()) {
+      return result;
+    }
+    for (MonitorCallback callback : callbacks) {
+      try {
+        callback.onEvaluation(result);
+      } catch (Exception e) {
+        LOG.warn(
+            "Monitor callback {} failed for scope {}",
+            callback.name(),
+            result.scope().identifier(),
+            e);
+      }
+    }
+    return result;
+  }
+
+  /** Close all initialized monitor providers and callbacks. */
+  @Override
+  public void close() throws Exception {
+    closeableGroup.close();
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestInstanceLoaderUtils.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestInstanceLoaderUtils.java
new file mode 100644
index 0000000000..eede9861f8
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestInstanceLoaderUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import 
org.apache.gravitino.maintenance.optimizer.updater.StatisticsCalculatorForTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestInstanceLoaderUtils {
+
+  @Test
+  public void testCreateMetricsEvaluatorInstance() {
+    MetricsEvaluator evaluator =
+        
InstanceLoaderUtils.createMetricsEvaluatorInstance(MetricsEvaluatorForTest.NAME);
+    Assertions.assertNotNull(evaluator);
+    Assertions.assertTrue(evaluator instanceof MetricsEvaluatorForTest);
+  }
+
+  @Test
+  public void testCreateMetricsEvaluatorInstanceWithNullName() {
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> InstanceLoaderUtils.createMetricsEvaluatorInstance(null));
+    Assertions.assertEquals(
+        "metrics evaluator name must not be null or blank", 
exception.getMessage());
+  }
+
+  @Test
+  public void testCreateMetricsEvaluatorInstanceWithBlankName() {
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> InstanceLoaderUtils.createMetricsEvaluatorInstance("   "));
+    Assertions.assertEquals(
+        "metrics evaluator name must not be null or blank", 
exception.getMessage());
+  }
+
+  @Test
+  public void testCreateStatisticsCalculatorInstance() {
+    StatisticsCalculator calculator =
+        
InstanceLoaderUtils.createStatisticsCalculatorInstance(StatisticsCalculatorForTest.NAME);
+    Assertions.assertNotNull(calculator);
+    Assertions.assertTrue(calculator instanceof StatisticsCalculatorForTest);
+  }
+
+  @Test
+  public void testCreateStatisticsCalculatorInstanceWithUnknownName() {
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> 
InstanceLoaderUtils.createStatisticsCalculatorInstance("unknown-calculator"));
+    Assertions.assertEquals(
+        "No StatisticsCalculator class found for: unknown-calculator", 
exception.getMessage());
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
index bc90dd9e35..9801b9f727 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
@@ -23,6 +23,9 @@ import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
@@ -68,4 +71,17 @@ public class TestProviderUtils {
     Assertions.assertNotNull(tableMetadataProvider);
     Assertions.assertTrue(tableMetadataProvider instanceof 
GravitinoTableMetadataProvider);
   }
+
+  @Test
+  public void testCreateMonitorProviders() {
+    Assertions.assertTrue(
+        
ProviderUtils.createMetricsProviderInstance(MetricsProviderForTest.NAME)
+            instanceof MetricsProviderForTest);
+    Assertions.assertTrue(
+        ProviderUtils.createJobProviderInstance(JobProviderForTest.NAME)
+            instanceof JobProviderForTest);
+    Assertions.assertTrue(
+        
ProviderUtils.createMonitorCallbackInstance(MonitorCallbackForTest.NAME)
+            instanceof MonitorCallbackForTest);
+  }
 }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
new file mode 100644
index 0000000000..5185edf7ce
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMonitor {
+
+  @Test
+  public void testEvaluateMetrics() throws Exception {
+    OptimizerConfig config =
+        new OptimizerConfig(
+            ImmutableMap.<String, String>builder()
+                .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(), 
MetricsProviderForTest.NAME)
+                .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(), 
JobProviderForTest.NAME)
+                .put(
+                    OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(), 
MetricsEvaluatorForTest.NAME)
+                .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(), 
MonitorCallbackForTest.NAME)
+                .build());
+
+    OptimizerEnv env = new OptimizerEnv(config);
+
+    MonitorCallbackForTest.reset();
+    MetricsEvaluatorForTest.reset();
+    MetricsEvaluatorForTest.failJob2(true);
+    MetricsProviderForTest.reset();
+
+    NameIdentifier tableIdentifier = NameIdentifier.parse("test.db.table");
+    List<EvaluationResult> results;
+    try (Monitor monitor = new Monitor(env)) {
+      results = monitor.evaluateMetrics(tableIdentifier, 100L, 10L, 
Optional.empty());
+    }
+
+    Assertions.assertEquals(3, results.size());
+    Assertions.assertEquals(3, MetricsEvaluatorForTest.INVOCATIONS.get());
+    Assertions.assertEquals(3, MonitorCallbackForTest.INVOCATIONS.get());
+    Assertions.assertEquals(3, MonitorCallbackForTest.RESULTS.size());
+
+    EvaluationResult tableResult = results.get(0);
+    EvaluationResult jobResult1 = results.get(1);
+    EvaluationResult jobResult2 = results.get(2);
+
+    Assertions.assertEquals(MetricScope.Type.TABLE, 
tableResult.scope().type());
+    Assertions.assertEquals(tableIdentifier, tableResult.scope().identifier());
+    Assertions.assertTrue(tableResult.evaluation());
+    Assertions.assertEquals(100L, tableResult.actionTimeSeconds());
+    Assertions.assertEquals(10L, tableResult.rangeSeconds());
+    Assertions.assertEquals(MetricsEvaluatorForTest.NAME, 
tableResult.evaluatorName());
+    Assertions.assertEquals(1, 
tableResult.beforeMetrics().get("row_count").size());
+    Assertions.assertEquals(1, 
tableResult.afterMetrics().get("row_count").size());
+    Assertions.assertEquals(95L, 
tableResult.beforeMetrics().get("row_count").get(0).timestamp());
+    Assertions.assertEquals(100L, 
tableResult.afterMetrics().get("row_count").get(0).timestamp());
+    Assertions.assertEquals(
+        100L,
+        ((Number) 
tableResult.beforeMetrics().get("row_count").get(0).statistic().value().value())
+            .longValue());
+    Assertions.assertEquals(
+        200L,
+        ((Number) 
tableResult.afterMetrics().get("row_count").get(0).statistic().value().value())
+            .longValue());
+
+    Assertions.assertEquals(MetricScope.Type.JOB, jobResult1.scope().type());
+    Assertions.assertEquals(JobProviderForTest.JOB1, 
jobResult1.scope().identifier());
+    Assertions.assertTrue(jobResult1.evaluation());
+    Assertions.assertEquals(99L, 
jobResult1.beforeMetrics().get("duration").get(0).timestamp());
+    Assertions.assertEquals(102L, 
jobResult1.afterMetrics().get("duration").get(0).timestamp());
+    Assertions.assertEquals(
+        10L,
+        ((Number) 
jobResult1.beforeMetrics().get("duration").get(0).statistic().value().value())
+            .longValue());
+    Assertions.assertEquals(
+        20L,
+        ((Number) 
jobResult1.afterMetrics().get("duration").get(0).statistic().value().value())
+            .longValue());
+
+    Assertions.assertEquals(MetricScope.Type.JOB, jobResult2.scope().type());
+    Assertions.assertEquals(JobProviderForTest.JOB2, 
jobResult2.scope().identifier());
+    Assertions.assertFalse(jobResult2.evaluation());
+    Assertions.assertEquals(98L, 
jobResult2.beforeMetrics().get("duration").get(0).timestamp());
+    Assertions.assertEquals(104L, 
jobResult2.afterMetrics().get("duration").get(0).timestamp());
+    Assertions.assertEquals(
+        30L,
+        ((Number) 
jobResult2.beforeMetrics().get("duration").get(0).statistic().value().value())
+            .longValue());
+    Assertions.assertEquals(
+        40L,
+        ((Number) 
jobResult2.afterMetrics().get("duration").get(0).statistic().value().value())
+            .longValue());
+  }
+
+  @Test
+  public void testEvaluateMetricsForPartition() throws Exception {
+    OptimizerConfig config =
+        new OptimizerConfig(
+            ImmutableMap.<String, String>builder()
+                .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(), 
MetricsProviderForTest.NAME)
+                .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(), 
JobProviderForTest.NAME)
+                .put(
+                    OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(), 
MetricsEvaluatorForTest.NAME)
+                .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(), 
MonitorCallbackForTest.NAME)
+                .build());
+
+    OptimizerEnv env = new OptimizerEnv(config);
+
+    MonitorCallbackForTest.reset();
+    MetricsEvaluatorForTest.reset();
+    MetricsEvaluatorForTest.failJob2(false);
+    MetricsProviderForTest.reset();
+
+    NameIdentifier tableIdentifier = NameIdentifier.parse("test.db.table");
+    PartitionPath partitionPath =
+        PartitionPath.of(List.of(new PartitionEntryImpl("dt", "2026-02-12")));
+    List<EvaluationResult> results;
+    try (Monitor monitor = new Monitor(env)) {
+      results = monitor.evaluateMetrics(tableIdentifier, 100L, 10L, 
Optional.of(partitionPath));
+    }
+
+    Assertions.assertEquals(1, 
MetricsProviderForTest.PARTITION_METRICS_CALLS.get());
+    Assertions.assertEquals(partitionPath, 
MetricsProviderForTest.LAST_PARTITION_PATH);
+    Assertions.assertEquals(3, results.size());
+
+    EvaluationResult tableResult = results.get(0);
+    Assertions.assertEquals(MetricScope.Type.PARTITION, 
tableResult.scope().type());
+    Assertions.assertEquals(partitionPath, 
tableResult.scope().partition().orElseThrow());
+    Assertions.assertEquals(97L, 
tableResult.beforeMetrics().get("row_count").get(0).timestamp());
+    Assertions.assertEquals(101L, 
tableResult.afterMetrics().get("row_count").get(0).timestamp());
+    Assertions.assertEquals(
+        110L,
+        ((Number) 
tableResult.beforeMetrics().get("row_count").get(0).statistic().value().value())
+            .longValue());
+    Assertions.assertEquals(
+        210L,
+        ((Number) 
tableResult.afterMetrics().get("row_count").get(0).statistic().value().value())
+            .longValue());
+  }
+
+  @Test
+  public void testEvaluateMetricsOverflowRange() throws Exception {
+    OptimizerConfig config =
+        new OptimizerConfig(
+            ImmutableMap.<String, String>builder()
+                .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(), 
MetricsProviderForTest.NAME)
+                .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(), 
JobProviderForTest.NAME)
+                .put(
+                    OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(), 
MetricsEvaluatorForTest.NAME)
+                .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(), 
MonitorCallbackForTest.NAME)
+                .build());
+    OptimizerEnv env = new OptimizerEnv(config);
+
+    try (Monitor monitor = new Monitor(env)) {
+      IllegalArgumentException exception =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  monitor.evaluateMetrics(
+                      NameIdentifier.parse("test.db.table"), Long.MAX_VALUE, 
1L, Optional.empty()));
+      Assertions.assertTrue(exception.getMessage().contains("time range 
overflow"));
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/MonitorCallbackForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/MonitorCallbackForTest.java
new file mode 100644
index 0000000000..596c7d1f91
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/MonitorCallbackForTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.callback;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class MonitorCallbackForTest implements MonitorCallback {
+
+  public static final String NAME = "monitor-callback-for-test";
+  public static final AtomicInteger INVOCATIONS = new AtomicInteger();
+  public static final CopyOnWriteArrayList<EvaluationResult> RESULTS = new 
CopyOnWriteArrayList<>();
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public void onEvaluation(EvaluationResult result) {
+    INVOCATIONS.incrementAndGet();
+    RESULTS.add(result);
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  public static void reset() {
+    INVOCATIONS.set(0);
+    RESULTS.clear();
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
new file mode 100644
index 0000000000..9d9a994d04
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.evaluator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+
+public class MetricsEvaluatorForTest implements MetricsEvaluator {
+
+  public static final String NAME = "test-metrics-evaluator";
+  public static final AtomicInteger INVOCATIONS = new AtomicInteger();
+  private static volatile boolean FAIL_JOB2 = false;
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public boolean evaluateMetrics(
+      MetricScope scope,
+      Map<String, List<MetricSample>> beforeMetrics,
+      Map<String, List<MetricSample>> afterMetrics) {
+    INVOCATIONS.incrementAndGet();
+    if (FAIL_JOB2
+        && scope.type() == MetricScope.Type.JOB
+        && JobProviderForTest.JOB2.equals(scope.identifier())) {
+      return false;
+    }
+    return true;
+  }
+
+  public static void reset() {
+    INVOCATIONS.set(0);
+    FAIL_JOB2 = false;
+  }
+
+  public static void failJob2(boolean failJob2) {
+    FAIL_JOB2 = failJob2;
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
new file mode 100644
index 0000000000..5ab2cebd61
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class JobProviderForTest implements JobProvider {
+
+  public static final String NAME = "job-provider-for-test";
+  public static final NameIdentifier JOB1 = 
NameIdentifier.parse("test.db.job1");
+  public static final NameIdentifier JOB2 = 
NameIdentifier.parse("test.db.job2");
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
+    return List.of(JOB1, JOB2);
+  }
+
+  @Override
+  public void close() throws Exception {}
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
new file mode 100644
index 0000000000..3f371c24c1
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.metrics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class MetricsProviderForTest implements MetricsProvider {
+
+  public static final String NAME = "metrics-provider-for-test";
+  public static final AtomicInteger PARTITION_METRICS_CALLS = new 
AtomicInteger();
+  public static volatile PartitionPath LAST_PARTITION_PATH = null;
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public Map<String, List<MetricSample>> jobMetrics(
+      NameIdentifier jobIdentifier, long startTime, long endTime) {
+    if (JobProviderForTest.JOB1.equals(jobIdentifier)) {
+      return Map.of(
+          "duration",
+          List.of(
+              metric(99, "duration", StatisticValues.longValue(10L)),
+              metric(102, "duration", StatisticValues.longValue(20L))));
+    }
+    if (JobProviderForTest.JOB2.equals(jobIdentifier)) {
+      return Map.of(
+          "duration",
+          List.of(
+              metric(98, "duration", StatisticValues.longValue(30L)),
+              metric(104, "duration", StatisticValues.longValue(40L))));
+    }
+    return Map.of();
+  }
+
+  @Override
+  public Map<String, List<MetricSample>> tableMetrics(
+      NameIdentifier tableIdentifier, long startTime, long endTime) {
+    return Map.of(
+        "row_count",
+        List.of(
+            metric(95, "row_count", StatisticValues.longValue(100L)),
+            metric(100, "row_count", StatisticValues.longValue(200L))));
+  }
+
+  @Override
+  public Map<String, List<MetricSample>> partitionMetrics(
+      NameIdentifier tableIdentifier, PartitionPath partitionPath, long 
startTime, long endTime) {
+    PARTITION_METRICS_CALLS.incrementAndGet();
+    LAST_PARTITION_PATH = partitionPath;
+    return Map.of(
+        "row_count",
+        List.of(
+            metric(97, "row_count", StatisticValues.longValue(110L)),
+            metric(101, "row_count", StatisticValues.longValue(210L))));
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  public static void reset() {
+    PARTITION_METRICS_CALLS.set(0);
+    LAST_PARTITION_PATH = null;
+  }
+
+  private static <T> MetricSample metric(
+      long timestamp, String metricName, StatisticValue<T> statisticValue) {
+    return new MetricSample() {
+      @Override
+      public long timestamp() {
+        return timestamp;
+      }
+
+      @Override
+      public StatisticEntry<?> statistic() {
+        return new StatisticEntry<T>() {
+          @Override
+          public String name() {
+            return metricName;
+          }
+
+          @Override
+          public StatisticValue<T> value() {
+            return statisticValue;
+          }
+        };
+      }
+    };
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 1948a23487..4cc9447c73 100644
--- 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -19,3 +19,6 @@
 
 org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
 org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
+org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest
+org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest
+org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
similarity index 84%
copy from 
maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
copy to 
maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
index 1948a23487..1e8cd59e16 100644
--- 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
@@ -17,5 +17,4 @@
 # under the License.
 #
 
-org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
+org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest

Reply via email to