This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new e5e0c0d9b Event Listeners (#922)
e5e0c0d9b is described below
commit e5e0c0d9b271550949abc3bb1cd6e0733204b16c
Author: Andrew Guterman <[email protected]>
AuthorDate: Wed May 7 10:28:29 2025 -0700
Event Listeners (#922)
Implementation of event listeners discussed
[here](https://lists.apache.org/thread/03yz5wolkvy8l7rbcwjnqdq1bl8p065v).
I decided to keep this implementation generic and not take a dependency on
Jakarta Events nor Vertx busses. It's easy to extend this, either within
Polaris or in an external PolarisEventListener, and handle events however one
wishes.
Some high level notes:
- PolarisEventListener is the main interface with all the event methods
such as `onBeforeRequestRateLimited`
- DefaultPolarisEventListener is an empty implementation which allows users
to only partially implement event handlers
- `polaris.events.type` is the config that lets you specify your event
listener implementation
---
.../src/main/resources/application.properties | 2 +
.../quarkus/config/ProductionReadinessChecks.java | 12 +++
.../service/quarkus/config/QuarkusProducers.java | 9 ++
.../QuarkusPolarisEventListenerConfiguration.java | 32 +++++++
.../quarkus/task/QuarkusTaskExecutorImpl.java | 8 +-
.../quarkus/admin/PolarisAuthzTestBase.java | 13 ++-
.../quarkus/catalog/GenericTableCatalogTest.java | 4 +-
.../catalog/IcebergCatalogHandlerAuthzTest.java | 3 +-
.../quarkus/catalog/IcebergCatalogTest.java | 83 +++++++++++++-----
.../quarkus/catalog/IcebergCatalogViewTest.java | 57 ++++++++++++-
.../service/quarkus/catalog/PolicyCatalogTest.java | 4 +-
.../quarkus/ratelimiter/RateLimiterFilterTest.java | 13 ++-
.../polaris/service/quarkus/test/TestData.java | 36 ++++++++
.../service/catalog/iceberg/IcebergCatalog.java | 31 ++++++-
.../context/PolarisCallContextCatalogFactory.java | 9 +-
.../service/events/AfterTableCommitedEvent.java | 34 ++++++++
.../service/events/AfterTableRefreshedEvent.java | 28 ++++++
.../service/events/AfterTaskAttemptedEvent.java | 34 ++++++++
.../service/events/AfterViewCommitedEvent.java | 33 ++++++++
.../service/events/AfterViewRefreshedEvent.java | 28 ++++++
.../events/BeforeRequestRateLimitedEvent.java | 28 ++++++
.../service/events/BeforeTableCommitedEvent.java | 35 ++++++++
.../service/events/BeforeTableRefreshedEvent.java | 29 +++++++
.../service/events/BeforeTaskAttemptedEvent.java | 32 +++++++
.../service/events/BeforeViewCommitedEvent.java | 34 ++++++++
.../service/events/BeforeViewRefreshedEvent.java | 29 +++++++
.../service/events/NoOpPolarisEventListener.java | 27 ++++++
.../polaris/service/events/PolarisEvent.java | 25 ++++++
.../service/events/PolarisEventListener.java | 58 +++++++++++++
.../service/events/TestPolarisEventListener.java | 91 ++++++++++++++++++++
.../service/ratelimiter/RateLimiterFilter.java | 9 +-
.../polaris/service/task/TaskExecutorImpl.java | 95 ++++++++++++---------
.../service/catalog/io/FileIOFactoryTest.java | 3 +-
.../polaris/service/task/TaskExecutorImplTest.java | 99 ++++++++++++++++++++++
.../org/apache/polaris/service/TestServices.java | 12 ++-
35 files changed, 999 insertions(+), 80 deletions(-)
diff --git a/quarkus/defaults/src/main/resources/application.properties
b/quarkus/defaults/src/main/resources/application.properties
index d9b82239e..0482f3de0 100644
--- a/quarkus/defaults/src/main/resources/application.properties
+++ b/quarkus/defaults/src/main/resources/application.properties
@@ -124,6 +124,8 @@ polaris.secrets-manager.type=in-memory
polaris.file-io.type=default
+polaris.event-listener.type=no-op
+
polaris.log.request-id-header-name=Polaris-Request-Id
# polaris.log.mdc.aid=polaris
# polaris.log.mdc.sid=polaris-service
diff --git
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
index 5d9524f4c..2389537f1 100644
---
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
+++
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
@@ -36,6 +36,8 @@ import org.apache.polaris.service.auth.AuthenticationType;
import org.apache.polaris.service.context.DefaultRealmContextResolver;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.context.TestRealmContextResolver;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
import
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
import org.eclipse.microprofile.config.Config;
@@ -161,6 +163,16 @@ public class ProductionReadinessChecks {
return ProductionReadinessCheck.OK;
}
+ @Produces
+ public ProductionReadinessCheck checkPolarisEventListener(
+ PolarisEventListener polarisEventListener) {
+ if (polarisEventListener instanceof TestPolarisEventListener) {
+ return ProductionReadinessCheck.of(
+ Error.of("TestPolarisEventListener is intended for tests only.",
"polaris.events.type"));
+ }
+ return ProductionReadinessCheck.OK;
+ }
+
private static String authRealmSegment(String realm) {
return realm.equals(QuarkusAuthenticationConfiguration.DEFAULT_REALM_KEY)
? "" : realm + ".";
}
diff --git
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
index 122921e82..89bf2c6fb 100644
---
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
+++
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
@@ -62,11 +62,13 @@ import
org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.RealmContextConfiguration;
import org.apache.polaris.service.context.RealmContextFilter;
import org.apache.polaris.service.context.RealmContextResolver;
+import org.apache.polaris.service.events.PolarisEventListener;
import
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
import
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationRealmConfiguration;
import
org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver;
import
org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration;
import
org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration;
+import
org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration;
import
org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration;
import
org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration;
import
org.apache.polaris.service.quarkus.ratelimiter.QuarkusTokenBucketConfiguration;
@@ -155,6 +157,13 @@ public class QuarkusProducers {
return fileIOFactories.select(Identifier.Literal.of(config.type())).get();
}
+ @Produces
+ public PolarisEventListener polarisEventListener(
+ QuarkusPolarisEventListenerConfiguration config,
+ @Any Instance<PolarisEventListener> polarisEventListeners) {
+ return
polarisEventListeners.select(Identifier.Literal.of(config.type())).get();
+ }
+
@Produces
public MetaStoreManagerFactory metaStoreManagerFactory(
QuarkusPersistenceConfiguration config,
diff --git
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java
new file mode 100644
index 000000000..8921c726c
--- /dev/null
+++
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.events;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+
+@StaticInitSafe
+@ConfigMapping(prefix = "polaris.event-listener")
+public interface QuarkusPolarisEventListenerConfiguration {
+ /**
+ * The type of the event listener to use. Must be a registered {@link
+ * org.apache.polaris.service.events.PolarisEventListener} identifier.
+ */
+ String type();
+}
diff --git
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
index 8f38de648..3e16edb5a 100644
---
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
+++
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
@@ -29,6 +29,7 @@ import jakarta.inject.Inject;
import java.util.concurrent.ExecutorService;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
import org.apache.polaris.service.quarkus.tracing.QuarkusTracingFilter;
import org.apache.polaris.service.task.TaskExecutorImpl;
import org.apache.polaris.service.task.TaskFileIOSupplier;
@@ -39,7 +40,7 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl
{
private final Tracer tracer;
public QuarkusTaskExecutorImpl() {
- this(null, null, null, null);
+ this(null, null, null, null, null);
}
@Inject
@@ -47,8 +48,9 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl
{
@Identifier("task-executor") ExecutorService executorService,
MetaStoreManagerFactory metaStoreManagerFactory,
TaskFileIOSupplier fileIOSupplier,
- Tracer tracer) {
- super(executorService, metaStoreManagerFactory, fileIOSupplier);
+ Tracer tracer,
+ PolarisEventListener polarisEventListener) {
+ super(executorService, metaStoreManagerFactory, fileIOSupplier,
polarisEventListener);
this.tracer = tracer;
}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 6f3498d31..5761812f4 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -88,6 +88,7 @@ import
org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.PolicyIdentifier;
@@ -188,6 +189,7 @@ public abstract class PolarisAuthzTestBase {
@Inject protected PolarisDiagnostics diagServices;
@Inject protected Clock clock;
@Inject protected FileIOFactory fileIOFactory;
+ @Inject protected PolarisEventListener polarisEventListener;
protected IcebergCatalog baseCatalog;
protected GenericTableCatalog genericTableCatalog;
@@ -469,7 +471,8 @@ public abstract class PolarisAuthzTestBase {
passthroughView,
securityContext,
Mockito.mock(),
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
this.baseCatalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
@@ -485,7 +488,7 @@ public abstract class PolarisAuthzTestBase {
extends PolarisCallContextCatalogFactory {
public TestPolarisCallContextCatalogFactory() {
- super(null, null, null, null, null);
+ super(null, null, null, null, null, null);
}
@Inject
@@ -494,13 +497,15 @@ public abstract class PolarisAuthzTestBase {
MetaStoreManagerFactory metaStoreManagerFactory,
UserSecretsManagerFactory userSecretsManagerFactory,
TaskExecutor taskExecutor,
- FileIOFactory fileIOFactory) {
+ FileIOFactory fileIOFactory,
+ PolarisEventListener polarisEventListener) {
super(
entityManagerFactory,
metaStoreManagerFactory,
userSecretsManagerFactory,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
}
@Override
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
index 96b1010b4..0b6af6e8f 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
@@ -81,6 +81,7 @@ import
org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.NoOpPolarisEventListener;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TaskExecutor;
import org.assertj.core.api.Assertions;
@@ -264,7 +265,8 @@ public class GenericTableCatalogTest {
passthroughView,
securityContext,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ new NoOpPolarisEventListener());
this.icebergCatalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
index 11a938f31..e74b7d641 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
@@ -1808,7 +1808,8 @@ public class IcebergCatalogHandlerAuthzTest extends
PolarisAuthzTestBase {
userSecretsManagerFactory,
Mockito.mock(),
new DefaultFileIOFactory(
- realmEntityManagerFactory, managerFactory, new
PolarisConfigurationStore() {})) {
+ realmEntityManagerFactory, managerFactory, new
PolarisConfigurationStore() {}),
+ polarisEventListener) {
@Override
public Catalog createCallContextCatalog(
CallContext context,
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
index 05ad0b54b..d81edd1d4 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
@@ -19,7 +19,6 @@
package org.apache.polaris.service.quarkus.catalog;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
@@ -116,8 +115,15 @@ import
org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
import org.apache.polaris.service.exception.FakeAzureHttpResponse;
import org.apache.polaris.service.exception.IcebergExceptionMapper;
+import org.apache.polaris.service.quarkus.test.TestData;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
import org.apache.polaris.service.task.TaskExecutor;
@@ -164,17 +170,14 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
"polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"",
"true",
"polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"",
- "[\"FILE\"]");
+ "[\"FILE\"]",
+ "polaris.event-listener.type",
+ "test");
}
}
- protected static final Namespace NS = Namespace.of("newdb");
- protected static final TableIdentifier TABLE = TableIdentifier.of(NS,
"table");
- protected static final Schema SCHEMA =
- new Schema(
- required(3, "id", Types.IntegerType.get(), "unique ID 🤪"),
- required(4, "data", Types.StringType.get()));
private static final String VIEW_QUERY = "select * from ns1.layer1_table";
+
public static final String CATALOG_NAME = "polaris-catalog";
public static final String TEST_ACCESS_KEY = "test_access_key";
public static final String SECRET_ACCESS_KEY = "secret_access_key";
@@ -198,6 +201,7 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
@Inject PolarisStorageIntegrationProvider storageIntegrationProvider;
@Inject UserSecretsManagerFactory userSecretsManagerFactory;
@Inject PolarisDiagnostics diagServices;
+ @Inject PolarisEventListener polarisEventListener;
private IcebergCatalog catalog;
private CallContext callContext;
@@ -210,6 +214,7 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
private FileIOFactory fileIOFactory;
private PolarisEntity catalogEntity;
private SecurityContext securityContext;
+ private TestPolarisEventListener testPolarisEventListener;
@BeforeAll
public static void setUpMocks() {
@@ -319,6 +324,7 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
.thenReturn((PolarisStorageIntegration) storageIntegration);
this.catalog = initCatalog("my-catalog", ImmutableMap.of());
+ testPolarisEventListener = (TestPolarisEventListener) polarisEventListener;
}
@AfterEach
@@ -354,7 +360,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
ImmutableMap.Builder<String, String> propertiesBuilder =
ImmutableMap.<String, String>builder()
.put(CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO")
@@ -649,7 +656,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
Mockito.mock(TaskExecutor.class),
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
@@ -984,7 +992,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
catalog.initialize(
catalogWithoutStorage,
ImmutableMap.of(
@@ -1050,7 +1059,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
catalog.initialize(
catalogName,
ImmutableMap.of(
@@ -1596,7 +1606,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
Mockito.mock(),
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
noPurgeCatalog.initialize(
noPurgeCatalogName,
ImmutableMap.of(
@@ -1704,7 +1715,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
Mockito.mock(),
- measured);
+ measured,
+ polarisEventListener);
catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
@@ -1714,8 +1726,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
.as("Nothing was created yet")
.isEqualTo(0);
- catalog.createNamespace(NS);
- Table table = catalog.buildTable(TABLE, SCHEMA).create();
+ catalog.createNamespace(TestData.NAMESPACE);
+ Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create();
// Asserting greaterThan 0 is sufficient for validating that the wrapper
works without making
// assumptions about the
@@ -1727,7 +1739,9 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
.as("A table was read and written, but a trip to storage was made")
.isEqualTo(0);
- Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should
succeed").isTrue();
+ Assertions.assertThat(catalog.dropTable(TestData.TABLE))
+ .as("Table deletion should succeed")
+ .isTrue();
TaskEntity taskEntity =
TaskEntity.of(
metaStoreManager
@@ -1801,7 +1815,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
Mockito.mock(TaskExecutor.class),
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
@@ -1849,7 +1864,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
Mockito.mock(TaskExecutor.class),
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
@@ -1924,6 +1940,35 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
}
}
+ @Test
+ public void testEventsAreEmitted() {
+ IcebergCatalog catalog = catalog();
+ catalog.createNamespace(TestData.NAMESPACE);
+ Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create();
+
+ String key = "foo";
+ String valOld = "bar1";
+ String valNew = "bar2";
+ table.updateProperties().set(key, valOld).commit();
+ table.updateProperties().set(key, valNew).commit();
+
+ var beforeRefreshEvent =
testPolarisEventListener.getLatest(BeforeTableRefreshedEvent.class);
+
Assertions.assertThat(beforeRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE);
+
+ var afterRefreshEvent =
testPolarisEventListener.getLatest(AfterTableRefreshedEvent.class);
+
Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE);
+
+ var beforeTableEvent =
testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class);
+
Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE);
+
Assertions.assertThat(beforeTableEvent.base().properties().get(key)).isEqualTo(valOld);
+
Assertions.assertThat(beforeTableEvent.metadata().properties().get(key)).isEqualTo(valNew);
+
+ var afterTableEvent =
testPolarisEventListener.getLatest(AfterTableCommitedEvent.class);
+
Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE);
+
Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld);
+
Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew);
+ }
+
private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) {
return (InMemoryFileIO) ((ExceptionMappingFileIO)
catalog.getIo()).getInnerIo();
}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
index 25089d5c4..c66e88b37 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewCatalogTests;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
@@ -67,12 +68,21 @@ import
org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
+import org.apache.polaris.service.quarkus.test.TestData;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.configuration.PreferredAssumptionException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
@@ -98,7 +108,9 @@ public class IcebergCatalogViewTest extends
ViewCatalogTests<IcebergCatalog> {
"polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"",
"true",
"polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"",
- "[\"FILE\"]");
+ "[\"FILE\"]",
+ "polaris.event-listener.type",
+ "test");
}
}
@@ -116,6 +128,7 @@ public class IcebergCatalogViewTest extends
ViewCatalogTests<IcebergCatalog> {
@Inject UserSecretsManagerFactory userSecretsManagerFactory;
@Inject PolarisConfigurationStore configurationStore;
@Inject PolarisDiagnostics diagServices;
+ @Inject PolarisEventListener polarisEventListener;
private IcebergCatalog catalog;
@@ -124,6 +137,8 @@ public class IcebergCatalogViewTest extends
ViewCatalogTests<IcebergCatalog> {
private UserSecretsManager userSecretsManager;
private PolarisCallContext polarisContext;
+ private TestPolarisEventListener testPolarisEventListener;
+
@BeforeAll
public static void setUpMocks() {
PolarisStorageIntegrationProviderImpl mock =
@@ -212,6 +227,8 @@ public class IcebergCatalogViewTest extends
ViewCatalogTests<IcebergCatalog> {
FileIOFactory fileIOFactory =
new DefaultFileIOFactory(
new RealmEntityManagerFactory(managerFactory), managerFactory,
configurationStore);
+
+ testPolarisEventListener = (TestPolarisEventListener) polarisEventListener;
this.catalog =
new IcebergCatalog(
entityManager,
@@ -220,7 +237,8 @@ public class IcebergCatalogViewTest extends
ViewCatalogTests<IcebergCatalog> {
passthroughView,
securityContext,
Mockito.mock(),
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
Map<String, String> properties =
ImmutableMap.<String, String>builder()
.put(CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO")
@@ -249,4 +267,39 @@ public class IcebergCatalogViewTest extends
ViewCatalogTests<IcebergCatalog> {
protected boolean requiresNamespaceCreate() {
return true;
}
+
+ @Test
+ public void testEventsAreEmitted() {
+ IcebergCatalog catalog = catalog();
+ catalog.createNamespace(TestData.NAMESPACE);
+ View view =
+ catalog
+ .buildView(TestData.TABLE)
+ .withDefaultNamespace(TestData.NAMESPACE)
+ .withSchema(TestData.SCHEMA)
+ .withQuery("a", "b")
+ .create();
+
+ String key = "foo";
+ String valOld = "bar1";
+ String valNew = "bar2";
+ view.updateProperties().set(key, valOld).commit();
+ view.updateProperties().set(key, valNew).commit();
+
+ var beforeRefreshEvent =
testPolarisEventListener.getLatest(BeforeViewRefreshedEvent.class);
+
Assertions.assertThat(beforeRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE);
+
+ var afterRefreshEvent =
testPolarisEventListener.getLatest(AfterViewRefreshedEvent.class);
+
Assertions.assertThat(afterRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE);
+
+ var beforeCommitEvent =
testPolarisEventListener.getLatest(BeforeViewCommitedEvent.class);
+
Assertions.assertThat(beforeCommitEvent.identifier()).isEqualTo(TestData.TABLE);
+
Assertions.assertThat(beforeCommitEvent.base().properties().get(key)).isEqualTo(valOld);
+
Assertions.assertThat(beforeCommitEvent.metadata().properties().get(key)).isEqualTo(valNew);
+
+ var afterCommitEvent =
testPolarisEventListener.getLatest(AfterViewCommitedEvent.class);
+
Assertions.assertThat(afterCommitEvent.identifier()).isEqualTo(TestData.TABLE);
+
Assertions.assertThat(afterCommitEvent.base().properties().get(key)).isEqualTo(valOld);
+
Assertions.assertThat(afterCommitEvent.metadata().properties().get(key)).isEqualTo(valNew);
+ }
}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
index a5bf98ba9..68e2c35e0 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
@@ -94,6 +94,7 @@ import
org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.policy.PolicyCatalog;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.NoOpPolarisEventListener;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.ApplicablePolicy;
@@ -287,7 +288,8 @@ public class PolicyCatalogTest {
passthroughView,
securityContext,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ new NoOpPolarisEventListener());
this.icebergCatalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
index a5f66ec5b..48aa45001 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
@@ -31,6 +31,9 @@ import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
import
org.apache.polaris.service.quarkus.ratelimiter.RateLimiterFilterTest.Profile;
import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestFixture;
import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestHelper;
@@ -81,7 +84,9 @@ public class RateLimiterFilterTest {
"polaris.authentication.token-broker.type",
"symmetric-key",
"polaris.authentication.token-broker.symmetric-key.secret",
- "secret");
+ "secret",
+ "polaris.event-listener.type",
+ "test");
}
}
@@ -90,6 +95,7 @@ public class RateLimiterFilterTest {
@Inject PolarisIntegrationTestHelper helper;
@Inject MeterRegistry meterRegistry;
+ @Inject PolarisEventListener polarisEventListener;
private TestEnvironment testEnv;
private PolarisIntegrationTestFixture fixture;
@@ -145,6 +151,11 @@ public class RateLimiterFilterTest {
}
requestAsserter.accept(Status.TOO_MANY_REQUESTS);
+ BeforeRequestRateLimitedEvent event =
+ ((TestPolarisEventListener) polarisEventListener)
+ .getLatest(BeforeRequestRateLimitedEvent.class);
+ assertThat(event.method()).isEqualTo("GET");
+
// Examples of expected metrics:
//
http_server_requests_seconds_count{application="Polaris",environment="prod",method="GET",outcome="CLIENT_ERROR",realm_id="org_apache_polaris_service_ratelimiter_RateLimiterFilterTest",status="429",uri="/api/management/v1/principal-roles"}
1.0
//
polaris_principal_roles_listPrincipalRoles_seconds_count{application="Polaris",class="org.apache.polaris.service.admin.api.PolarisPrincipalRolesApi",environment="prod",exception="none",method="listPrincipalRoles"}
50.0
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java
new file mode 100644
index 000000000..8c061f201
--- /dev/null
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+
+/** Contains test data that can be reused across tests */
+public class TestData {
+ public static final Namespace NAMESPACE = Namespace.of("newdb");
+ public static final TableIdentifier TABLE = TableIdentifier.of(NAMESPACE,
"table");
+ public static final Schema SCHEMA =
+ new Schema(
+ required(3, "id", Types.IntegerType.get(), "unique ID 🤪"),
+ required(4, "data", Types.StringType.get()));
+}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index afa7d1477..c66def885 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -123,6 +123,15 @@ import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.service.catalog.SupportsNotifications;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOUtil;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
@@ -171,6 +180,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
private final CatalogEntity catalogEntity;
private final TaskExecutor taskExecutor;
private final SecurityContext securityContext;
+ private final PolarisEventListener polarisEventListener;
+
private String ioImplClassName;
private FileIO catalogFileIO;
private final String catalogName;
@@ -197,7 +208,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
PolarisResolutionManifestCatalogView resolvedEntityView,
SecurityContext securityContext,
TaskExecutor taskExecutor,
- FileIOFactory fileIOFactory) {
+ FileIOFactory fileIOFactory,
+ PolarisEventListener polarisEventListener) {
this.entityManager = entityManager;
this.callContext = callContext;
this.resolvedEntityView = resolvedEntityView;
@@ -209,6 +221,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
this.catalogName = catalogEntity.getName();
this.fileIOFactory = fileIOFactory;
this.metaStoreManager = metaStoreManager;
+ this.polarisEventListener = polarisEventListener;
}
@Override
@@ -1319,6 +1332,7 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
if (latestLocation == null) {
disableRefresh();
} else {
+ polarisEventListener.onBeforeTableRefreshed(new
BeforeTableRefreshedEvent(tableIdentifier));
refreshFromMetadataLocation(
latestLocation,
SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1338,10 +1352,14 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
Set.of(PolarisStorageActions.READ));
return TableMetadataParser.read(fileIO, metadataLocation);
});
+ polarisEventListener.onAfterTableRefreshed(new
AfterTableRefreshedEvent(tableIdentifier));
}
}
public void doCommit(TableMetadata base, TableMetadata metadata) {
+ polarisEventListener.onBeforeTableCommited(
+ new BeforeTableCommitedEvent(tableIdentifier, base, metadata));
+
LOGGER.debug(
"doCommit for table {} with base {}, metadata {}", tableIdentifier,
base, metadata);
// TODO: Maybe avoid writing metadata if there's definitely a
transaction conflict
@@ -1494,6 +1512,9 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
} else {
updateTableLike(tableIdentifier, entity);
}
+
+ polarisEventListener.onAfterTableCommited(
+ new AfterTableCommitedEvent(tableIdentifier, base, metadata));
}
@Override
@@ -1684,6 +1705,7 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
if (latestLocation == null) {
disableRefresh();
} else {
+ polarisEventListener.onBeforeViewRefreshed(new
BeforeViewRefreshedEvent(identifier));
refreshFromMetadataLocation(
latestLocation,
SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1705,10 +1727,14 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
return
ViewMetadataParser.read(fileIO.newInputFile(metadataLocation));
});
+ polarisEventListener.onAfterViewRefreshed(new
AfterViewRefreshedEvent(identifier));
}
}
public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+ polarisEventListener.onBeforeViewCommited(
+ new BeforeViewCommitedEvent(identifier, base, metadata));
+
// TODO: Maybe avoid writing metadata if there's definitely a
transaction conflict
LOGGER.debug("doCommit for view {} with base {}, metadata {}",
identifier, base, metadata);
if (null == base && !namespaceExists(identifier.namespace())) {
@@ -1802,6 +1828,9 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
} else {
updateTableLike(identifier, entity);
}
+
+ polarisEventListener.onAfterViewCommited(
+ new AfterViewCommitedEvent(identifier, base, metadata));
}
protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
diff --git
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
index 94fe19760..b1d28ce89 100644
---
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
+++
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
@@ -38,6 +38,7 @@ import
org.apache.polaris.core.secrets.UserSecretsManagerFactory;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
import org.apache.polaris.service.task.TaskExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class PolarisCallContextCatalogFactory implements
CallContextCatalogFacto
private final FileIOFactory fileIOFactory;
private final MetaStoreManagerFactory metaStoreManagerFactory;
private final UserSecretsManagerFactory userSecretsManagerFactory;
+ private final PolarisEventListener polarisEventListener;
@Inject
public PolarisCallContextCatalogFactory(
@@ -62,12 +64,14 @@ public class PolarisCallContextCatalogFactory implements
CallContextCatalogFacto
MetaStoreManagerFactory metaStoreManagerFactory,
UserSecretsManagerFactory userSecretsManagerFactory,
TaskExecutor taskExecutor,
- FileIOFactory fileIOFactory) {
+ FileIOFactory fileIOFactory,
+ PolarisEventListener polarisEventListener) {
this.entityManagerFactory = entityManagerFactory;
this.metaStoreManagerFactory = metaStoreManagerFactory;
this.userSecretsManagerFactory = userSecretsManagerFactory;
this.taskExecutor = taskExecutor;
this.fileIOFactory = fileIOFactory;
+ this.polarisEventListener = polarisEventListener;
}
@Override
@@ -95,7 +99,8 @@ public class PolarisCallContextCatalogFactory implements
CallContextCatalogFacto
resolvedManifest,
securityContext,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY,
catalogInstance);
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
new file mode 100644
index 000000000..c952997df
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted after Polaris performs a commit to a table. This is not emitted if
there's an exception
+ * while committing.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record AfterTableCommitedEvent(
+ TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
+ implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
new file mode 100644
index 000000000..be38a8baa
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted after Polaris refreshes its known version of a table's metadata by
fetching the latest.
+ *
+ * @param tableIdentifier The identifier of the table that was refreshed.
+ */
+public record AfterTableRefreshedEvent(TableIdentifier tableIdentifier)
implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
new file mode 100644
index 000000000..638ba84fb
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.polaris.core.context.CallContext;
+
+/**
+ * Emitted after an attempt of an async task, such as manifest file cleanup,
finishes.
+ *
+ * @param taskEntityId The ID of the TaskEntity.
+ * @param callContext The CallContext the task is being executed under.
+ * @param attempt The attempt number. Each retry of the task will have its own
attempt number. The
+ * initial (non-retried) attempt starts counting from 1.
+ * @param success Whether or not the attempt succeeded.
+ */
+public record AfterTaskAttemptedEvent(
+ long taskEntityId, CallContext callContext, int attempt, boolean success)
+ implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
new file mode 100644
index 000000000..eb2ca2414
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.view.ViewMetadata;
+
+/**
+ * Emitted after Polaris performs a commit to a view. This is not emitted if
there's an exception
+ * while committing.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record AfterViewCommitedEvent(
+ TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata)
implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
new file mode 100644
index 000000000..249220ddd
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted after Polaris refreshes its known version of a view's metadata by
fetching the latest.
+ *
+ * @param viewIdentifier The identifier of the view that was refreshed.
+ */
+public record AfterViewRefreshedEvent(TableIdentifier viewIdentifier)
implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
new file mode 100644
index 000000000..1d9780ebe
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+/**
+ * Emitted before the RateLimiterFilter rejects a request due to exceeding the
rate limit.
+ *
+ * @param method The request's HTTP method
+ * @param absolutePath The request's absolute path
+ */
+public record BeforeRequestRateLimitedEvent(String method, String absolutePath)
+ implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
new file mode 100644
index 000000000..2bcc49ab6
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted when Polaris intends to perform a commit to a table. There is no
guarantee on the order
+ * of this event relative to the validation checks we've performed, which
means the commit may still
+ * fail Polaris-side validation checks.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record BeforeTableCommitedEvent(
+ TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
+ implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
new file mode 100644
index 000000000..f319298f5
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted when Polaris intends to refresh its known version of a table's
metadata by fetching the
+ * latest.
+ *
+ * @param tableIdentifier The identifier of the table being refreshed.
+ */
+public record BeforeTableRefreshedEvent(TableIdentifier tableIdentifier)
implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
new file mode 100644
index 000000000..a7fa7231e
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.polaris.core.context.CallContext;
+
+/**
+ * Emitted before an attempt of an async task, such as manifest file cleanup,
begins.
+ *
+ * @param taskEntityId The ID of the TaskEntity
+ * @param callContext The CallContext the task is being executed under.
+ * @param attempt The attempt number. Each retry of the task will have its own
attempt number. The
+ * initial (non-retried) attempt starts counting from 1.
+ */
+public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext
callContext, int attempt)
+ implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
new file mode 100644
index 000000000..16e460d80
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.view.ViewMetadata;
+
+/**
+ * Emitted when Polaris intends to perform a commit to a view. There is no
guarantee on the order of
+ * this event relative to the validation checks we've performed, which means
the commit may still
+ * fail Polaris-side validation checks.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record BeforeViewCommitedEvent(
+ TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata)
implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
new file mode 100644
index 000000000..6f58d2ca2
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted when Polaris intends to refresh its known version of a view's
metadata by fetching the
+ * latest.
+ *
+ * @param viewIdentifier The identifier of the view being refreshed.
+ */
+public record BeforeViewRefreshedEvent(TableIdentifier viewIdentifier)
implements PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java
b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java
new file mode 100644
index 000000000..f31fbcef5
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+
+/** Event listener that does nothing. */
+@ApplicationScoped
+@Identifier("no-op")
+public class NoOpPolarisEventListener extends PolarisEventListener {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java
new file mode 100644
index 000000000..4922c02f4
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+/**
+ * Represents an event emitted by Polaris. Currently there's no common data
across events so this is
+ * just a marker interface. *
+ */
+public interface PolarisEvent {}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java
new file mode 100644
index 000000000..485766bb2
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+/**
+ * Represents an event listener that can respond to notable moments during
Polaris's execution.
+ * Event details are documented under the event objects themselves.
+ */
+public abstract class PolarisEventListener {
+ /** {@link BeforeRequestRateLimitedEvent} */
+ public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event)
{}
+
+ /** {@link BeforeTableCommitedEvent} */
+ public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+
+ /** {@link AfterTableCommitedEvent} */
+ public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+
+ /** {@link BeforeViewCommitedEvent} */
+ public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+
+ /** {@link AfterViewCommitedEvent} */
+ public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+
+ /** {@link BeforeTableRefreshedEvent} */
+ public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+
+ /** {@link AfterTableRefreshedEvent} */
+ public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+
+ /** {@link BeforeViewRefreshedEvent} */
+ public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+
+ /** {@link AfterViewRefreshedEvent} */
+ public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+
+ /** {@link BeforeTaskAttemptedEvent} */
+ public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+
+ /** {@link AfterTaskAttemptedEvent} */
+ public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java
b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java
new file mode 100644
index 000000000..668edc7fa
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import com.google.common.collect.Streams;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Event listener that stores all emitted events forever. Not recommended for
use in production. */
+@ApplicationScoped
+@Identifier("test")
+public class TestPolarisEventListener extends PolarisEventListener {
+ private final List<PolarisEvent> history = new ArrayList<>();
+
+ public <T> T getLatest(Class<T> type) {
+ return (T)
Streams.findLast(history.stream().filter(type::isInstance)).orElseThrow();
+ }
+
+ @Override
+ public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onBeforeTableCommited(BeforeTableCommitedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onAfterTableCommited(AfterTableCommitedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onBeforeViewCommited(BeforeViewCommitedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onAfterViewCommited(AfterViewCommitedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {
+ history.add(event);
+ }
+
+ @Override
+ public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {
+ history.add(event);
+ }
+}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
index 28bb60589..7b0fda93f 100644
---
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
@@ -28,6 +28,8 @@ import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.Provider;
import java.io.IOException;
import org.apache.polaris.service.config.PolarisFilterPriorities;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,16 +42,21 @@ public class RateLimiterFilter implements
ContainerRequestFilter {
private static final Logger LOGGER =
LoggerFactory.getLogger(RateLimiterFilter.class);
private final RateLimiter rateLimiter;
+ private final PolarisEventListener polarisEventListener;
@Inject
- public RateLimiterFilter(RateLimiter rateLimiter) {
+ public RateLimiterFilter(RateLimiter rateLimiter, PolarisEventListener
polarisEventListener) {
this.rateLimiter = rateLimiter;
+ this.polarisEventListener = polarisEventListener;
}
/** Returns a 429 if the rate limiter says so. Otherwise, forwards the
request along. */
@Override
public void filter(ContainerRequestContext ctx) throws IOException {
if (!rateLimiter.canProceed()) {
+ polarisEventListener.onBeforeRequestRateLimited(
+ new BeforeRequestRateLimitedEvent(
+ ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString()));
ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build());
LOGGER.atDebug().log("Rate limiting request");
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
index 6b42c3fc8..140931031 100644
---
a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
+++
b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
@@ -33,6 +33,9 @@ import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,14 +51,17 @@ public class TaskExecutorImpl implements TaskExecutor {
private final MetaStoreManagerFactory metaStoreManagerFactory;
private final TaskFileIOSupplier fileIOSupplier;
private final List<TaskHandler> taskHandlers = new CopyOnWriteArrayList<>();
+ private final PolarisEventListener polarisEventListener;
public TaskExecutorImpl(
Executor executor,
MetaStoreManagerFactory metaStoreManagerFactory,
- TaskFileIOSupplier fileIOSupplier) {
+ TaskFileIOSupplier fileIOSupplier,
+ PolarisEventListener polarisEventListener) {
this.executor = executor;
this.metaStoreManagerFactory = metaStoreManagerFactory;
this.fileIOSupplier = fileIOSupplier;
+ this.polarisEventListener = polarisEventListener;
}
public void init() {
@@ -111,45 +117,54 @@ public class TaskExecutorImpl implements TaskExecutor {
}
protected void handleTask(long taskEntityId, CallContext ctx, int attempt) {
- // set the call context INSIDE the async task
- CallContext.setCurrentContext(ctx);
- LOGGER.info("Handling task entity id {}", taskEntityId);
- PolarisMetaStoreManager metaStoreManager =
-
metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext());
- PolarisBaseEntity taskEntity =
- metaStoreManager
- .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId,
PolarisEntityType.TASK)
- .getEntity();
- if (!PolarisEntityType.TASK.equals(taskEntity.getType())) {
- throw new IllegalArgumentException("Provided taskId must be a task
entity type");
- }
- TaskEntity task = TaskEntity.of(taskEntity);
- Optional<TaskHandler> handlerOpt =
- taskHandlers.stream().filter(th -> th.canHandleTask(task)).findFirst();
- if (handlerOpt.isEmpty()) {
- LOGGER
- .atWarn()
- .addKeyValue("taskEntityId", taskEntityId)
- .addKeyValue("taskType", task.getTaskType())
- .log("Unable to find handler for task type");
- return;
- }
- TaskHandler handler = handlerOpt.get();
- boolean success = handler.handleTask(task, ctx);
- if (success) {
- LOGGER
- .atInfo()
- .addKeyValue("taskEntityId", taskEntityId)
- .addKeyValue("handlerClass", handler.getClass())
- .log("Task successfully handled");
- metaStoreManager.dropEntityIfExists(
- ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false);
- } else {
- LOGGER
- .atWarn()
- .addKeyValue("taskEntityId", taskEntityId)
- .addKeyValue("taskEntityName", taskEntity.getName())
- .log("Unable to execute async task");
+ polarisEventListener.onBeforeTaskAttempted(
+ new BeforeTaskAttemptedEvent(taskEntityId, ctx, attempt));
+
+ boolean success = false;
+ try {
+ // set the call context INSIDE the async task
+ CallContext.setCurrentContext(ctx);
+ LOGGER.info("Handling task entity id {}", taskEntityId);
+ PolarisMetaStoreManager metaStoreManager =
+
metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext());
+ PolarisBaseEntity taskEntity =
+ metaStoreManager
+ .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId,
PolarisEntityType.TASK)
+ .getEntity();
+ if (!PolarisEntityType.TASK.equals(taskEntity.getType())) {
+ throw new IllegalArgumentException("Provided taskId must be a task
entity type");
+ }
+ TaskEntity task = TaskEntity.of(taskEntity);
+ Optional<TaskHandler> handlerOpt =
+ taskHandlers.stream().filter(th ->
th.canHandleTask(task)).findFirst();
+ if (handlerOpt.isEmpty()) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("taskEntityId", taskEntityId)
+ .addKeyValue("taskType", task.getTaskType())
+ .log("Unable to find handler for task type");
+ return;
+ }
+ TaskHandler handler = handlerOpt.get();
+ success = handler.handleTask(task, ctx);
+ if (success) {
+ LOGGER
+ .atInfo()
+ .addKeyValue("taskEntityId", taskEntityId)
+ .addKeyValue("handlerClass", handler.getClass())
+ .log("Task successfully handled");
+ metaStoreManager.dropEntityIfExists(
+ ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false);
+ } else {
+ LOGGER
+ .atWarn()
+ .addKeyValue("taskEntityId", taskEntityId)
+ .addKeyValue("taskEntityName", taskEntity.getName())
+ .log("Unable to execute async task");
+ }
+ } finally {
+ polarisEventListener.onAfterTaskAttempted(
+ new AfterTaskAttemptedEvent(taskEntityId, ctx, attempt, success));
}
}
}
diff --git
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index 9a070273b..a6854703d 100644
---
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -244,7 +244,8 @@ public class FileIOFactoryTest {
passthroughView,
services.securityContext(),
services.taskExecutor(),
- services.fileIOFactory());
+ services.fileIOFactory(),
+ services.polarisEventListener());
polarisCatalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
diff --git
a/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
new file mode 100644
index 000000000..1cc88fc10
--- /dev/null
+++
b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.TaskEntity;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.TestPolarisEventListener;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for TaskExecutorImpl */
+public class TaskExecutorImplTest {
+ @Test
+ void testEventsAreEmitted() {
+ String realm = "myrealm";
+ RealmContext realmContext = () -> realm;
+
+ TestServices testServices =
TestServices.builder().realmContext(realmContext).build();
+
+ TestPolarisEventListener testPolarisEventListener =
+ (TestPolarisEventListener) testServices.polarisEventListener();
+
+ MetaStoreManagerFactory metaStoreManagerFactory =
testServices.metaStoreManagerFactory();
+ PolarisMetaStoreManager metaStoreManager =
+ metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
+ BasePersistence bp =
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get();
+
+ PolarisCallContext polarisCallCtx =
+ new PolarisCallContext(bp, testServices.polarisDiagnostics());
+ CallContext callContext = CallContext.of(realmContext, polarisCallCtx);
+
+ // This task doesn't have a type so it won't be handle-able by a real
handler. We register a
+ // test TaskHandler below that can handle any task.
+ TaskEntity taskEntity =
+ new TaskEntity.Builder()
+ .setName("mytask")
+
.setId(metaStoreManager.generateNewEntityId(polarisCallCtx).getId())
+ .build();
+ metaStoreManager.createEntityIfNotExists(polarisCallCtx, null, taskEntity);
+
+ int attempt = 1;
+
+ TaskExecutorImpl executor =
+ new TaskExecutorImpl(
+ Runnable::run,
+ testServices.metaStoreManagerFactory(),
+ new TaskFileIOSupplier(testServices.fileIOFactory()),
+ testServices.polarisEventListener());
+ executor.addTaskHandler(
+ new TaskHandler() {
+ @Override
+ public boolean canHandleTask(TaskEntity task) {
+ return true;
+ }
+
+ @Override
+ public boolean handleTask(TaskEntity task, CallContext callContext) {
+ var beforeTaskAttemptedEvent =
+
testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class);
+ Assertions.assertEquals(taskEntity.getId(),
beforeTaskAttemptedEvent.taskEntityId());
+ Assertions.assertEquals(callContext,
beforeTaskAttemptedEvent.callContext());
+ Assertions.assertEquals(attempt,
beforeTaskAttemptedEvent.attempt());
+ return true;
+ }
+ });
+
+ executor.handleTask(taskEntity.getId(), callContext, attempt);
+
+ var afterAttemptTaskEvent =
testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class);
+ Assertions.assertEquals(taskEntity.getId(),
afterAttemptTaskEvent.taskEntityId());
+ Assertions.assertEquals(callContext, afterAttemptTaskEvent.callContext());
+ Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt());
+ Assertions.assertTrue(afterAttemptTaskEvent.success());
+ }
+}
diff --git
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 88406b895..cb48ad0a1 100644
---
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -56,6 +56,8 @@ import
org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
import
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import org.apache.polaris.service.secrets.UnsafeInMemorySecretsManagerFactory;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
@@ -75,7 +77,8 @@ public record TestServices(
RealmContext realmContext,
SecurityContext securityContext,
FileIOFactory fileIOFactory,
- TaskExecutor taskExecutor) {
+ TaskExecutor taskExecutor,
+ PolarisEventListener polarisEventListener) {
private static final RealmContext TEST_REALM = () -> "test-realm";
private static final String GCP_ACCESS_TOKEN = "abc";
@@ -175,13 +178,15 @@ public record TestServices(
TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);
+ PolarisEventListener polarisEventListener = new
TestPolarisEventListener();
CallContextCatalogFactory callContextFactory =
new PolarisCallContextCatalogFactory(
realmEntityManagerFactory,
metaStoreManagerFactory,
userSecretsManagerFactory,
taskExecutor,
- fileIOFactory);
+ fileIOFactory,
+ polarisEventListener);
IcebergCatalogAdapter service =
new IcebergCatalogAdapter(
@@ -252,7 +257,8 @@ public record TestServices(
realmContext,
securityContext,
fileIOFactory,
- taskExecutor);
+ taskExecutor,
+ polarisEventListener);
}
}
}