This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new d36e88e6f feat(metrics): Evolve PolarisMetricsReporter interface with 
timestamp parameter and comprehensive documentation (#3468)
d36e88e6f is described below

commit d36e88e6fcff2137eff26f5e3bf2c0632bef3d7b
Author: Anand K Sankaran <[email protected]>
AuthorDate: Fri Jan 23 08:11:52 2026 -0800

    feat(metrics): Evolve PolarisMetricsReporter interface with timestamp 
parameter and comprehensive documentation (#3468)
    
    Enhance the `PolarisMetricsReporter` SPI interface by adding a timestamp 
parameter to the `reportMetric()` method, enabling accurate time-series metrics 
reporting to external systems.
---
 CHANGELOG.md                                       |  1 +
 .../catalog/iceberg/IcebergCatalogAdapter.java     |  9 +++--
 .../service/reporting/DefaultMetricsReporter.java  | 38 ++++++++++++++++++----
 .../service/reporting/PolarisMetricsReporter.java  | 32 +++++++++++++++++-
 .../reporting/DefaultMetricsReporterTest.java      | 10 +++---
 .../org/apache/polaris/service/TestServices.java   |  3 +-
 6 files changed, 78 insertions(+), 15 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8aa4ed3dc..1eaec3be4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -34,6 +34,7 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
 ### Breaking changes
 
 - The (Before/After)CommitTableEvent has been removed.
+- The `PolarisMetricsReporter.reportMetric()` method signature has been 
extended to include a `receivedTimestamp` parameter of type `java.time.Instant`.
 
 ### New Features
 
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 6c30afb9e..fb54b5572 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -30,6 +30,7 @@ import jakarta.inject.Inject;
 import jakarta.ws.rs.core.HttpHeaders;
 import jakarta.ws.rs.core.Response;
 import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
 import java.util.EnumSet;
 import java.util.Optional;
 import java.util.function.Function;
@@ -105,6 +106,7 @@ public class IcebergCatalogAdapter
   private final Instance<ExternalCatalogFactory> externalCatalogFactories;
   private final StorageAccessConfigProvider storageAccessConfigProvider;
   private final PolarisMetricsReporter metricsReporter;
+  private final Clock clock;
 
   @Inject
   public IcebergCatalogAdapter(
@@ -122,7 +124,8 @@ public class IcebergCatalogAdapter
       CatalogHandlerUtils catalogHandlerUtils,
       @Any Instance<ExternalCatalogFactory> externalCatalogFactories,
       StorageAccessConfigProvider storageAccessConfigProvider,
-      PolarisMetricsReporter metricsReporter) {
+      PolarisMetricsReporter metricsReporter,
+      Clock clock) {
     this.diagnostics = diagnostics;
     this.realmContext = realmContext;
     this.callContext = callContext;
@@ -139,6 +142,7 @@ public class IcebergCatalogAdapter
     this.externalCatalogFactories = externalCatalogFactories;
     this.storageAccessConfigProvider = storageAccessConfigProvider;
     this.metricsReporter = metricsReporter;
+    this.clock = clock;
   }
 
   /**
@@ -722,7 +726,8 @@ public class IcebergCatalogAdapter
     Namespace ns = decodeNamespace(namespace);
     TableIdentifier tableIdentifier = TableIdentifier.of(ns, 
RESTUtil.decodeString(table));
 
-    metricsReporter.reportMetric(catalogName, tableIdentifier, 
reportMetricsRequest.report());
+    metricsReporter.reportMetric(
+        catalogName, tableIdentifier, reportMetricsRequest.report(), 
clock.instant());
     return Response.status(Response.Status.NO_CONTENT).build();
   }
 
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
index 5c7b4934a..eaff0219b 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
@@ -21,32 +21,56 @@ package org.apache.polaris.service.reporting;
 import com.google.common.annotations.VisibleForTesting;
 import io.smallrye.common.annotation.Identifier;
 import jakarta.enterprise.context.ApplicationScoped;
-import org.apache.commons.lang3.function.TriConsumer;
+import java.time.Instant;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.metrics.MetricsReport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Default implementation of {@link PolarisMetricsReporter} that logs metrics 
to the configured
+ * logger.
+ *
+ * <p>This implementation is selected when {@code 
polaris.iceberg-metrics.reporting.type} is set to
+ * {@code "default"} (the default value).
+ *
+ * <p>By default, logging is disabled. To enable metrics logging, set the 
logger level for {@code
+ * org.apache.polaris.service.reporting} to {@code INFO} in your logging 
configuration.
+ *
+ * @see PolarisMetricsReporter
+ */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultMetricsReporter implements PolarisMetricsReporter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultMetricsReporter.class);
 
-  private final TriConsumer<String, TableIdentifier, MetricsReport> 
reportConsumer;
+  private final QuadConsumer<String, TableIdentifier, MetricsReport, Instant> 
reportConsumer;
+
+  /** Functional interface for consuming metrics reports with timestamp. */
+  @FunctionalInterface
+  interface QuadConsumer<T1, T2, T3, T4> {
+    void accept(T1 t1, T2 t2, T3 t3, T4 t4);
+  }
 
+  /** Creates a new DefaultMetricsReporter that logs metrics to the class 
logger. */
   public DefaultMetricsReporter() {
     this(
-        (catalogName, table, metricsReport) ->
-            LOGGER.info("{}.{}: {}", catalogName, table, metricsReport));
+        (catalogName, table, metricsReport, receivedTimestamp) ->
+            LOGGER.info("{}.{} (ts={}): {}", catalogName, table, 
receivedTimestamp, metricsReport));
   }
 
   @VisibleForTesting
-  DefaultMetricsReporter(TriConsumer<String, TableIdentifier, MetricsReport> 
reportConsumer) {
+  DefaultMetricsReporter(
+      QuadConsumer<String, TableIdentifier, MetricsReport, Instant> 
reportConsumer) {
     this.reportConsumer = reportConsumer;
   }
 
   @Override
-  public void reportMetric(String catalogName, TableIdentifier table, 
MetricsReport metricsReport) {
-    reportConsumer.accept(catalogName, table, metricsReport);
+  public void reportMetric(
+      String catalogName,
+      TableIdentifier table,
+      MetricsReport metricsReport,
+      Instant receivedTimestamp) {
+    reportConsumer.accept(catalogName, table, metricsReport, 
receivedTimestamp);
   }
 }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
index 7ffd84f4d..b27184d55 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
@@ -18,9 +18,39 @@
  */
 package org.apache.polaris.service.reporting;
 
+import java.time.Instant;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.metrics.MetricsReport;
 
+/**
+ * SPI interface for reporting Iceberg metrics received by Polaris.
+ *
+ * <p>Implementations can be used to send metrics to external systems for 
analysis and monitoring.
+ * Custom implementations can be annotated with appropriate {@code Quarkus} 
scope and {@link
+ * io.smallrye.common.annotation.Identifier @Identifier("my-reporter-type")} 
for CDI discovery.
+ *
+ * <p>The implementation to use is selected via the {@code 
polaris.iceberg-metrics.reporting.type}
+ * configuration property, which defaults to {@code "default"}.
+ *
+ * <p>Implementations can inject other CDI beans for context.
+ *
+ * @see DefaultMetricsReporter
+ * @see MetricsReportingConfiguration
+ */
 public interface PolarisMetricsReporter {
-  public void reportMetric(String catalogName, TableIdentifier table, 
MetricsReport metricsReport);
+
+  /**
+   * Reports an Iceberg metrics report for a specific table.
+   *
+   * @param catalogName the name of the catalog containing the table
+   * @param table the identifier of the table the metrics are for
+   * @param metricsReport the Iceberg metrics report (e.g., {@link
+   *     org.apache.iceberg.metrics.ScanReport} or {@link 
org.apache.iceberg.metrics.CommitReport})
+   * @param receivedTimestamp the timestamp when the metrics were received by 
Polaris
+   */
+  void reportMetric(
+      String catalogName,
+      TableIdentifier table,
+      MetricsReport metricsReport,
+      Instant receivedTimestamp);
 }
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
index dfdde0f3e..8762c3ed7 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
@@ -21,7 +21,7 @@ package org.apache.polaris.service.reporting;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
-import org.apache.commons.lang3.function.TriConsumer;
+import java.time.Instant;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.metrics.MetricsReport;
 import org.junit.jupiter.api.Test;
@@ -31,14 +31,16 @@ public class DefaultMetricsReporterTest {
   @Test
   void testLogging() {
     @SuppressWarnings("unchecked")
-    TriConsumer<String, TableIdentifier, MetricsReport> mockConsumer = 
mock(TriConsumer.class);
+    DefaultMetricsReporter.QuadConsumer<String, TableIdentifier, 
MetricsReport, Instant>
+        mockConsumer = mock(DefaultMetricsReporter.QuadConsumer.class);
     DefaultMetricsReporter reporter = new DefaultMetricsReporter(mockConsumer);
     String warehouse = "testWarehouse";
     TableIdentifier table = TableIdentifier.of("testNamespace", "testTable");
     MetricsReport metricsReport = mock(MetricsReport.class);
+    Instant receivedTimestamp = Instant.ofEpochMilli(1234567890L);
 
-    reporter.reportMetric(warehouse, table, metricsReport);
+    reporter.reportMetric(warehouse, table, metricsReport, receivedTimestamp);
 
-    verify(mockConsumer).accept(warehouse, table, metricsReport);
+    verify(mockConsumer).accept(warehouse, table, metricsReport, 
receivedTimestamp);
   }
 }
diff --git 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 59af4b5a6..30303121e 100644
--- 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -350,7 +350,8 @@ public record TestServices(
               catalogHandlerUtils,
               externalCatalogFactory,
               storageAccessConfigProvider,
-              new DefaultMetricsReporter());
+              new DefaultMetricsReporter(),
+              Clock.systemUTC());
 
       // Optionally wrap with event delegator
       IcebergRestCatalogApiService finalRestCatalogService = catalogService;

Reply via email to