This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new d03c7174c Add Events for Iceberg REST APIs (#2480)
d03c7174c is described below
commit d03c7174ccf6771a0673e02b5e6d691d6cca9419
Author: Adnan Hemani <[email protected]>
AuthorDate: Wed Sep 17 14:44:19 2025 -0700
Add Events for Iceberg REST APIs (#2480)
---
.../polaris/service/admin/PolarisServiceImpl.java | 2 -
.../service/catalog/iceberg/IcebergCatalog.java | 45 ++-
.../catalog/iceberg/IcebergCatalogHandler.java | 8 +-
.../IcebergRestCatalogEventServiceDelegator.java | 336 ++++++++++++++++++---
...bergRestConfigurationEventServiceDelegator.java | 11 +-
...emptedEvent.java => AfterAttemptTaskEvent.java} | 2 +-
.../service/events/AfterCatalogCreatedEvent.java | 23 --
.../service/events/AfterTableCommitedEvent.java | 35 ---
.../service/events/AfterTableCreatedEvent.java | 28 --
.../service/events/AfterTableRefreshedEvent.java | 30 --
.../service/events/AfterViewCommitedEvent.java | 35 ---
.../service/events/AfterViewRefreshedEvent.java | 30 --
...mptedEvent.java => BeforeAttemptTaskEvent.java} | 2 +-
...Event.java => BeforeLimitRequestRateEvent.java} | 2 +-
.../service/events/BeforeTableCommitedEvent.java | 35 ---
.../service/events/BeforeTableRefreshedEvent.java | 31 --
.../service/events/BeforeViewCommitedEvent.java | 36 ---
.../service/events/BeforeViewRefreshedEvent.java | 31 --
.../service/events/IcebergRestCatalogEvents.java | 251 +++++++++++++++
.../PropertyMapEventListener.java | 4 +-
.../events/listeners/PolarisEventListener.java | 229 +++++++++++---
.../listeners/PolarisPersistenceEventListener.java | 68 ++---
.../events/listeners/TestPolarisEventListener.java | 37 +--
.../service/ratelimiter/RateLimiterFilter.java | 6 +-
.../polaris/service/task/TaskExecutorImpl.java | 10 +-
.../iceberg/AbstractIcebergCatalogTest.java | 23 +-
.../iceberg/AbstractIcebergCatalogViewTest.java | 28 +-
.../cloudwatch/AwsCloudWatchEventListenerTest.java | 17 +-
.../service/ratelimiter/RateLimiterFilterTest.java | 6 +-
.../polaris/service/task/TaskExecutorImplTest.java | 9 +-
30 files changed, 861 insertions(+), 549 deletions(-)
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
index e8a1c5d99..48ad7027e 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
@@ -76,7 +76,6 @@ import
org.apache.polaris.service.admin.api.PolarisCatalogsApiService;
import org.apache.polaris.service.admin.api.PolarisPrincipalRolesApiService;
import org.apache.polaris.service.admin.api.PolarisPrincipalsApiService;
import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.types.PolicyIdentifier;
import org.slf4j.Logger;
@@ -131,7 +130,6 @@ public class PolarisServiceImpl
validateExternalCatalog(catalog);
Catalog newCatalog =
CatalogEntity.of(adminService.createCatalog(request)).asCatalog();
LOGGER.info("Created new catalog {}", newCatalog);
- polarisEventListener.onAfterCatalogCreated(new
AfterCatalogCreatedEvent(newCatalog.getName()));
return Response.status(Response.Status.CREATED).build();
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index d7dc69b26..101fe3330 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -132,14 +132,7 @@ import
org.apache.polaris.service.catalog.common.LocationUtils;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOUtil;
import
org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation;
-import org.apache.polaris.service.events.AfterTableCommitedEvent;
-import org.apache.polaris.service.events.AfterTableRefreshedEvent;
-import org.apache.polaris.service.events.AfterViewCommitedEvent;
-import org.apache.polaris.service.events.AfterViewRefreshedEvent;
-import org.apache.polaris.service.events.BeforeTableCommitedEvent;
-import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
-import org.apache.polaris.service.events.BeforeViewCommitedEvent;
-import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.NotificationRequest;
@@ -1446,8 +1439,8 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
if (latestLocation == null) {
disableRefresh();
} else {
- polarisEventListener.onBeforeTableRefreshed(
- new BeforeTableRefreshedEvent(catalogName, tableIdentifier));
+ polarisEventListener.onBeforeRefreshTable(
+ new IcebergRestCatalogEvents.BeforeRefreshTableEvent(catalogName,
tableIdentifier));
refreshFromMetadataLocation(
latestLocation,
SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1467,14 +1460,15 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
Set.of(PolarisStorageActions.READ,
PolarisStorageActions.LIST));
return TableMetadataParser.read(fileIO, metadataLocation);
});
- polarisEventListener.onAfterTableRefreshed(
- new AfterTableRefreshedEvent(catalogName, tableIdentifier));
+ polarisEventListener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent(catalogName,
tableIdentifier));
}
}
public void doCommit(TableMetadata base, TableMetadata metadata) {
- polarisEventListener.onBeforeTableCommited(
- new BeforeTableCommitedEvent(tableIdentifier, base, metadata));
+ polarisEventListener.onBeforeCommitTable(
+ new IcebergRestCatalogEvents.BeforeCommitTableEvent(
+ catalogName, tableIdentifier, base, metadata));
LOGGER.debug(
"doCommit for table {} with metadataBefore {}, metadataAfter {}",
@@ -1618,8 +1612,9 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
updateTableLike(tableIdentifier, entity);
}
- polarisEventListener.onAfterTableCommited(
- new AfterTableCommitedEvent(catalogName, tableIdentifier, base,
metadata));
+ polarisEventListener.onAfterCommitTable(
+ new IcebergRestCatalogEvents.AfterCommitTableEvent(
+ catalogName, tableIdentifier, base, metadata));
}
@Override
@@ -1810,8 +1805,8 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
if (latestLocation == null) {
disableRefresh();
} else {
- polarisEventListener.onBeforeViewRefreshed(
- new BeforeViewRefreshedEvent(catalogName, identifier));
+ polarisEventListener.onBeforeRefreshView(
+ new IcebergRestCatalogEvents.BeforeRefreshViewEvent(catalogName,
identifier));
refreshFromMetadataLocation(
latestLocation,
SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1833,14 +1828,15 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
return
ViewMetadataParser.read(fileIO.newInputFile(metadataLocation));
});
- polarisEventListener.onAfterViewRefreshed(
- new AfterViewRefreshedEvent(catalogName, identifier));
+ polarisEventListener.onAfterRefreshView(
+ new IcebergRestCatalogEvents.AfterRefreshViewEvent(catalogName,
identifier));
}
}
public void doCommit(ViewMetadata base, ViewMetadata metadata) {
- polarisEventListener.onBeforeViewCommited(
- new BeforeViewCommitedEvent(catalogName, identifier, base,
metadata));
+ polarisEventListener.onBeforeCommitView(
+ new IcebergRestCatalogEvents.BeforeCommitViewEvent(
+ catalogName, identifier, base, metadata));
// TODO: Maybe avoid writing metadata if there's definitely a
transaction conflict
LOGGER.debug(
@@ -1940,8 +1936,9 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
updateTableLike(identifier, entity);
}
- polarisEventListener.onAfterViewCommited(
- new AfterViewCommitedEvent(catalogName, identifier, base, metadata));
+ polarisEventListener.onAfterCommitView(
+ new IcebergRestCatalogEvents.AfterCommitViewEvent(
+ catalogName, identifier, base, metadata));
}
protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 863e1aef5..b9c99ac3a 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -102,7 +102,6 @@ import
org.apache.polaris.service.catalog.SupportsNotifications;
import org.apache.polaris.service.catalog.common.CatalogHandler;
import org.apache.polaris.service.config.ReservedProperties;
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
-import org.apache.polaris.service.events.AfterTableCreatedEvent;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.http.IcebergHttpUtil;
import org.apache.polaris.service.http.IfNoneMatch;
@@ -392,11 +391,8 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
.withWriteOrder(request.writeOrder())
.setProperties(reservedProperties.removeReservedProperties(request.properties()))
.build();
- LoadTableResponse resp =
- catalogHandlerUtils.createTable(baseCatalog, namespace,
requestWithoutReservedProperties);
- polarisEventListener.onAfterTableCreated(
- new AfterTableCreatedEvent(catalogName, identifier,
resp.tableMetadata()));
- return resp;
+ return catalogHandlerUtils.createTable(
+ baseCatalog, namespace, requestWithoutReservedProperties);
}
/**
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java
index 43d1c1f00..edf15d484 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java
@@ -25,6 +25,7 @@ import jakarta.decorator.Delegate;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -33,17 +34,75 @@ import
org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
+import org.apache.iceberg.rest.responses.GetNamespaceResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.LoadViewResponse;
+import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.service.catalog.CatalogPrefixParser;
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
+import org.apache.polaris.service.catalog.common.CatalogAdapter;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateNamespaceEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropNamespaceEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListNamespacesEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListTablesEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListViewsEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadCredentialsEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRegisterTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRenameTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRenameViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterReplaceViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterSendNotificationEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterUpdateTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckExistsTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckExistsViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateNamespaceEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropNamespaceEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListNamespacesEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListTablesEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListViewsEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadCredentialsEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRegisterTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRenameTableEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRenameViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeReplaceViewEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeSendNotificationEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent;
+import
org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeUpdateTableEvent;
+import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.types.CommitTableRequest;
import org.apache.polaris.service.types.CommitViewRequest;
import org.apache.polaris.service.types.NotificationRequest;
@Decorator
@Priority(1000)
-public class IcebergRestCatalogEventServiceDelegator implements
IcebergRestCatalogApiService {
+public class IcebergRestCatalogEventServiceDelegator
+ implements IcebergRestCatalogApiService, CatalogAdapter {
@Inject @Delegate IcebergCatalogAdapter delegate;
+ @Inject PolarisEventListener polarisEventListener;
+ @Inject CatalogPrefixParser prefixParser;
@Override
public Response createNamespace(
@@ -51,7 +110,18 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
CreateNamespaceRequest createNamespaceRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.createNamespace(prefix, createNamespaceRequest,
realmContext, securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ polarisEventListener.onBeforeCreateNamespace(
+ new BeforeCreateNamespaceEvent(catalogName, createNamespaceRequest));
+ Response resp =
+ delegate.createNamespace(prefix, createNamespaceRequest, realmContext,
securityContext);
+ CreateNamespaceResponse createNamespaceResponse =
(CreateNamespaceResponse) resp.getEntity();
+ polarisEventListener.onAfterCreateNamespace(
+ new AfterCreateNamespaceEvent(
+ catalogName,
+ createNamespaceResponse.namespace(),
+ createNamespaceResponse.properties()));
+ return resp;
}
@Override
@@ -62,26 +132,51 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String parent,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.listNamespaces(
- prefix, pageToken, pageSize, parent, realmContext, securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ polarisEventListener.onBeforeListNamespaces(new
BeforeListNamespacesEvent(catalogName, parent));
+ Response resp =
+ delegate.listNamespaces(prefix, pageToken, pageSize, parent,
realmContext, securityContext);
+ polarisEventListener.onAfterListNamespaces(new
AfterListNamespacesEvent(catalogName, parent));
+ return resp;
}
@Override
public Response loadNamespaceMetadata(
String prefix, String namespace, RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.loadNamespaceMetadata(prefix, namespace, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ polarisEventListener.onBeforeLoadNamespaceMetadata(
+ new BeforeLoadNamespaceMetadataEvent(catalogName,
decodeNamespace(namespace)));
+ Response resp =
+ delegate.loadNamespaceMetadata(prefix, namespace, realmContext,
securityContext);
+ GetNamespaceResponse getNamespaceResponse = (GetNamespaceResponse)
resp.getEntity();
+ polarisEventListener.onAfterLoadNamespaceMetadata(
+ new AfterLoadNamespaceMetadataEvent(
+ catalogName, getNamespaceResponse.namespace(),
getNamespaceResponse.properties()));
+ return resp;
}
@Override
public Response namespaceExists(
String prefix, String namespace, RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.namespaceExists(prefix, namespace, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeCheckExistsNamespace(
+ new BeforeCheckExistsNamespaceEvent(catalogName, namespaceObj));
+ Response resp = delegate.namespaceExists(prefix, namespace, realmContext,
securityContext);
+ polarisEventListener.onAfterCheckExistsNamespace(
+ new AfterCheckExistsNamespaceEvent(catalogName, namespaceObj));
+ return resp;
}
@Override
public Response dropNamespace(
String prefix, String namespace, RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.dropNamespace(prefix, namespace, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ polarisEventListener.onBeforeDropNamespace(
+ new BeforeDropNamespaceEvent(catalogName, decodeNamespace(namespace)));
+ Response resp = delegate.dropNamespace(prefix, namespace, realmContext,
securityContext);
+ polarisEventListener.onAfterDropNamespace(new
AfterDropNamespaceEvent(catalogName, namespace));
+ return resp;
}
@Override
@@ -91,8 +186,18 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.updateProperties(
- prefix, namespace, updateNamespacePropertiesRequest, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeUpdateNamespaceProperties(
+ new BeforeUpdateNamespacePropertiesEvent(
+ catalogName, namespaceObj, updateNamespacePropertiesRequest));
+ Response resp =
+ delegate.updateProperties(
+ prefix, namespace, updateNamespacePropertiesRequest, realmContext,
securityContext);
+ polarisEventListener.onAfterUpdateNamespaceProperties(
+ new AfterUpdateNamespacePropertiesEvent(
+ catalogName, namespaceObj, (UpdateNamespacePropertiesResponse)
resp.getEntity()));
+ return resp;
}
@Override
@@ -103,8 +208,28 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String accessDelegationMode,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.createTable(
- prefix, namespace, createTableRequest, accessDelegationMode,
realmContext, securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeCreateTable(
+ new BeforeCreateTableEvent(
+ catalogName, namespaceObj, createTableRequest,
accessDelegationMode));
+ Response resp =
+ delegate.createTable(
+ prefix,
+ namespace,
+ createTableRequest,
+ accessDelegationMode,
+ realmContext,
+ securityContext);
+ if (!createTableRequest.stageCreate()) {
+ polarisEventListener.onAfterCreateTable(
+ new AfterCreateTableEvent(
+ catalogName,
+ namespaceObj,
+ createTableRequest.name(),
+ (LoadTableResponse) resp.getEntity()));
+ }
+ return resp;
}
@Override
@@ -115,8 +240,13 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
Integer pageSize,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.listTables(
- prefix, namespace, pageToken, pageSize, realmContext, securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeListTables(new
BeforeListTablesEvent(catalogName, namespaceObj));
+ Response resp =
+ delegate.listTables(prefix, namespace, pageToken, pageSize,
realmContext, securityContext);
+ polarisEventListener.onAfterListTables(new
AfterListTablesEvent(catalogName, namespaceObj));
+ return resp;
}
@Override
@@ -129,15 +259,24 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String snapshots,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.loadTable(
- prefix,
- namespace,
- table,
- accessDelegationMode,
- ifNoneMatchString,
- snapshots,
- realmContext,
- securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeLoadTable(
+ new BeforeLoadTableEvent(
+ catalogName, namespaceObj, table, accessDelegationMode,
ifNoneMatchString, snapshots));
+ Response resp =
+ delegate.loadTable(
+ prefix,
+ namespace,
+ table,
+ accessDelegationMode,
+ ifNoneMatchString,
+ snapshots,
+ realmContext,
+ securityContext);
+ polarisEventListener.onAfterLoadTable(
+ new AfterLoadTableEvent(catalogName, namespaceObj, (LoadTableResponse)
resp.getEntity()));
+ return resp;
}
@Override
@@ -147,7 +286,14 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String table,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.tableExists(prefix, namespace, table, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeCheckExistsTable(
+ new BeforeCheckExistsTableEvent(catalogName, namespaceObj, table));
+ Response resp = delegate.tableExists(prefix, namespace, table,
realmContext, securityContext);
+ polarisEventListener.onAfterCheckExistsTable(
+ new AfterCheckExistsTableEvent(catalogName, namespaceObj, table));
+ return resp;
}
@Override
@@ -158,8 +304,15 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
Boolean purgeRequested,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.dropTable(
- prefix, namespace, table, purgeRequested, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeDropTable(
+ new BeforeDropTableEvent(catalogName, namespaceObj, table,
purgeRequested));
+ Response resp =
+ delegate.dropTable(prefix, namespace, table, purgeRequested,
realmContext, securityContext);
+ polarisEventListener.onAfterDropTable(
+ new AfterDropTableEvent(catalogName, namespaceObj, table,
purgeRequested));
+ return resp;
}
@Override
@@ -169,8 +322,17 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
RegisterTableRequest registerTableRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.registerTable(
- prefix, namespace, registerTableRequest, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeRegisterTable(
+ new BeforeRegisterTableEvent(catalogName, namespaceObj,
registerTableRequest));
+ Response resp =
+ delegate.registerTable(
+ prefix, namespace, registerTableRequest, realmContext,
securityContext);
+ polarisEventListener.onAfterRegisterTable(
+ new AfterRegisterTableEvent(
+ catalogName, namespaceObj, (LoadTableResponse) resp.getEntity()));
+ return resp;
}
@Override
@@ -179,7 +341,13 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
RenameTableRequest renameTableRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.renameTable(prefix, renameTableRequest, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ polarisEventListener.onBeforeRenameTable(
+ new BeforeRenameTableEvent(catalogName, renameTableRequest));
+ Response resp = delegate.renameTable(prefix, renameTableRequest,
realmContext, securityContext);
+ polarisEventListener.onAfterRenameTable(
+ new AfterRenameTableEvent(catalogName, renameTableRequest));
+ return resp;
}
@Override
@@ -190,8 +358,17 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
CommitTableRequest commitTableRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.updateTable(
- prefix, namespace, table, commitTableRequest, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeUpdateTable(
+ new BeforeUpdateTableEvent(catalogName, namespaceObj, table,
commitTableRequest));
+ Response resp =
+ delegate.updateTable(
+ prefix, namespace, table, commitTableRequest, realmContext,
securityContext);
+ polarisEventListener.onAfterUpdateTable(
+ new AfterUpdateTableEvent(
+ catalogName, namespaceObj, table, (LoadTableResponse)
resp.getEntity()));
+ return resp;
}
@Override
@@ -201,7 +378,15 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
CreateViewRequest createViewRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.createView(prefix, namespace, createViewRequest,
realmContext, securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeCreateView(
+ new BeforeCreateViewEvent(catalogName, namespaceObj,
createViewRequest));
+ Response resp =
+ delegate.createView(prefix, namespace, createViewRequest,
realmContext, securityContext);
+ polarisEventListener.onAfterCreateView(
+ new AfterCreateViewEvent(catalogName, namespaceObj, (LoadViewResponse)
resp.getEntity()));
+ return resp;
}
@Override
@@ -212,8 +397,13 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
Integer pageSize,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.listViews(
- prefix, namespace, pageToken, pageSize, realmContext, securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeListViews(new
BeforeListViewsEvent(catalogName, namespaceObj));
+ Response resp =
+ delegate.listViews(prefix, namespace, pageToken, pageSize,
realmContext, securityContext);
+ polarisEventListener.onAfterListViews(new AfterListViewsEvent(catalogName,
namespaceObj));
+ return resp;
}
@Override
@@ -223,7 +413,15 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String table,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.loadCredentials(prefix, namespace, table, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeLoadCredentials(
+ new BeforeLoadCredentialsEvent(catalogName, namespaceObj, table));
+ Response resp =
+ delegate.loadCredentials(prefix, namespace, table, realmContext,
securityContext);
+ polarisEventListener.onAfterLoadCredentials(
+ new AfterLoadCredentialsEvent(catalogName, namespaceObj, table));
+ return resp;
}
@Override
@@ -233,7 +431,13 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String view,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.loadView(prefix, namespace, view, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeLoadView(new BeforeLoadViewEvent(catalogName,
namespaceObj, view));
+ Response resp = delegate.loadView(prefix, namespace, view, realmContext,
securityContext);
+ polarisEventListener.onAfterLoadView(
+ new AfterLoadViewEvent(catalogName, namespaceObj, (LoadViewResponse)
resp.getEntity()));
+ return resp;
}
@Override
@@ -243,7 +447,14 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String view,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.viewExists(prefix, namespace, view, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeCheckExistsView(
+ new BeforeCheckExistsViewEvent(catalogName, namespaceObj, view));
+ Response resp = delegate.viewExists(prefix, namespace, view, realmContext,
securityContext);
+ polarisEventListener.onAfterCheckExistsView(
+ new AfterCheckExistsViewEvent(catalogName, namespaceObj, view));
+ return resp;
}
@Override
@@ -253,7 +464,12 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
String view,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.dropView(prefix, namespace, view, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeDropView(new BeforeDropViewEvent(catalogName,
namespaceObj, view));
+ Response resp = delegate.dropView(prefix, namespace, view, realmContext,
securityContext);
+ polarisEventListener.onAfterDropView(new AfterDropViewEvent(catalogName,
namespaceObj, view));
+ return resp;
}
@Override
@@ -262,7 +478,13 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
RenameTableRequest renameTableRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.renameView(prefix, renameTableRequest, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ polarisEventListener.onBeforeRenameView(
+ new BeforeRenameViewEvent(catalogName, renameTableRequest));
+ Response resp = delegate.renameView(prefix, renameTableRequest,
realmContext, securityContext);
+ polarisEventListener.onAfterRenameView(
+ new AfterRenameViewEvent(catalogName, renameTableRequest));
+ return resp;
}
@Override
@@ -273,8 +495,17 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
CommitViewRequest commitViewRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.replaceView(
- prefix, namespace, view, commitViewRequest, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeReplaceView(
+ new BeforeReplaceViewEvent(catalogName, namespaceObj, view,
commitViewRequest));
+ Response resp =
+ delegate.replaceView(
+ prefix, namespace, view, commitViewRequest, realmContext,
securityContext);
+ polarisEventListener.onAfterReplaceView(
+ new AfterReplaceViewEvent(
+ catalogName, namespaceObj, view, (LoadViewResponse)
resp.getEntity()));
+ return resp;
}
@Override
@@ -283,10 +514,19 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
CommitTransactionRequest commitTransactionRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.commitTransaction(
- prefix, commitTransactionRequest, realmContext, securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ polarisEventListener.onBeforeCommitTransaction(
+ new IcebergRestCatalogEvents.BeforeCommitTransactionEvent(
+ catalogName, commitTransactionRequest));
+ Response resp =
+ delegate.commitTransaction(prefix, commitTransactionRequest,
realmContext, securityContext);
+ polarisEventListener.onAfterCommitTransaction(
+ new IcebergRestCatalogEvents.AfterCommitTransactionEvent(
+ catalogName, commitTransactionRequest));
+ return resp;
}
+ /** This API is currently a no-op in Polaris. */
@Override
public Response reportMetrics(
String prefix,
@@ -307,7 +547,15 @@ public class IcebergRestCatalogEventServiceDelegator
implements IcebergRestCatal
NotificationRequest notificationRequest,
RealmContext realmContext,
SecurityContext securityContext) {
- return delegate.sendNotification(
- prefix, namespace, table, notificationRequest, realmContext,
securityContext);
+ String catalogName = prefixParser.prefixToCatalogName(realmContext,
prefix);
+ Namespace namespaceObj = decodeNamespace(namespace);
+ polarisEventListener.onBeforeSendNotification(
+ new BeforeSendNotificationEvent(catalogName, namespaceObj, table,
notificationRequest));
+ Response resp =
+ delegate.sendNotification(
+ prefix, namespace, table, notificationRequest, realmContext,
securityContext);
+ polarisEventListener.onAfterSendNotification(
+ new AfterSendNotificationEvent(catalogName, namespaceObj, table));
+ return resp;
}
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java
index 6e018c9c2..55b563e3c 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java
@@ -25,8 +25,11 @@ import jakarta.decorator.Delegate;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
+import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.polaris.core.context.RealmContext;
import
org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
+import org.apache.polaris.service.events.listeners.PolarisEventListener;
@Decorator
@Priority(1000)
@@ -34,10 +37,16 @@ public class IcebergRestConfigurationEventServiceDelegator
implements IcebergRestConfigurationApiService {
@Inject @Delegate IcebergCatalogAdapter delegate;
+ @Inject PolarisEventListener polarisEventListener;
@Override
public Response getConfig(
String warehouse, RealmContext realmContext, SecurityContext
securityContext) {
- return delegate.getConfig(warehouse, realmContext, securityContext);
+ polarisEventListener.onBeforeGetConfig(
+ new IcebergRestCatalogEvents.BeforeGetConfigEvent(warehouse));
+ Response resp = delegate.getConfig(warehouse, realmContext,
securityContext);
+ polarisEventListener.onAfterGetConfig(
+ new
IcebergRestCatalogEvents.AfterGetConfigEvent(resp.readEntity(ConfigResponse.class)));
+ return resp;
}
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java
similarity index 93%
rename from
runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
rename to
runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java
index d1ccfb196..0eb3ebf01 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java
@@ -26,5 +26,5 @@ package org.apache.polaris.service.events;
* initial (non-retried) attempt starts counting from 1.
* @param success Whether the attempt succeeded.
*/
-public record AfterTaskAttemptedEvent(long taskEntityId, int attempt, boolean
success)
+public record AfterAttemptTaskEvent(long taskEntityId, int attempt, boolean
success)
implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java
deleted file mode 100644
index 0ed7cef48..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.polaris.service.events;
-
-/** Emitted Polaris creates a catalog. */
-public record AfterCatalogCreatedEvent(String catalogName) implements
PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
deleted file mode 100644
index a69d30448..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.catalog.TableIdentifier;
-
-/**
- * Emitted after Polaris performs a commit to a table. This is not emitted if
there's an exception
- * while committing.
- *
- * @param catalogName The catalog name where this table exists.
- * @param identifier The identifier.
- * @param base The old metadata.
- * @param metadata The new metadata.
- */
-public record AfterTableCommitedEvent(
- String catalogName, TableIdentifier identifier, TableMetadata base,
TableMetadata metadata)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java
deleted file mode 100644
index dd8939a35..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.catalog.TableIdentifier;
-
-/** Emitted when Polaris creates a table. */
-public record AfterTableCreatedEvent(
- String catalogName, TableIdentifier identifier, TableMetadata metadata)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
deleted file mode 100644
index caef8e3c5..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.catalog.TableIdentifier;
-
-/**
- * Emitted after Polaris refreshes its known version of a table's metadata by
fetching the latest.
- *
- * @param catalogName The name of the catalog where the table is located
- * @param tableIdentifier The identifier of the table that was refreshed.
- */
-public record AfterTableRefreshedEvent(String catalogName, TableIdentifier
tableIdentifier)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
deleted file mode 100644
index a7d01ecc2..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.view.ViewMetadata;
-
-/**
- * Emitted after Polaris performs a commit to a view. This is not emitted if
there's an exception
- * while committing.
- *
- * @param catalogName The catalog name where the view is located
- * @param identifier The identifier.
- * @param base The old metadata.
- * @param metadata The new metadata.
- */
-public record AfterViewCommitedEvent(
- String catalogName, TableIdentifier identifier, ViewMetadata base,
ViewMetadata metadata)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
deleted file mode 100644
index b44fa2a51..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.catalog.TableIdentifier;
-
-/**
- * Emitted after Polaris refreshes its known version of a view's metadata by
fetching the latest.
- *
- * @param catalogName
- * @param viewIdentifier The identifier of the view that was refreshed.
- */
-public record AfterViewRefreshedEvent(String catalogName, TableIdentifier
viewIdentifier)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java
similarity index 92%
rename from
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
rename to
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java
index fcfa4400a..0a5fad641 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java
@@ -25,4 +25,4 @@ package org.apache.polaris.service.events;
* @param attempt The attempt number. Each retry of the task will have its own
attempt number. The
* initial (non-retried) attempt starts counting from 1.
*/
-public record BeforeTaskAttemptedEvent(long taskEntityId, int attempt)
implements PolarisEvent {}
+public record BeforeAttemptTaskEvent(long taskEntityId, int attempt)
implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeLimitRequestRateEvent.java
similarity index 93%
rename from
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
rename to
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeLimitRequestRateEvent.java
index 1d9780ebe..e20d83202 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeLimitRequestRateEvent.java
@@ -24,5 +24,5 @@ package org.apache.polaris.service.events;
* @param method The request's HTTP method
* @param absolutePath The request's absolute path
*/
-public record BeforeRequestRateLimitedEvent(String method, String absolutePath)
+public record BeforeLimitRequestRateEvent(String method, String absolutePath)
implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
deleted file mode 100644
index 511f6f097..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.catalog.TableIdentifier;
-
-/**
- * Emitted when Polaris intends to perform a commit to a table. There is no
guarantee on the order
- * of this event relative to the validation checks we've performed, which
means the commit may still
- * fail Polaris-side validation checks.
- *
- * @param tableIdentifier The identifier.
- * @param metadataBefore The old metadata.
- * @param metadataAfter The new metadata.
- */
-public record BeforeTableCommitedEvent(
- TableIdentifier tableIdentifier, TableMetadata metadataBefore,
TableMetadata metadataAfter)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
deleted file mode 100644
index f5daade51..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.catalog.TableIdentifier;
-
-/**
- * Emitted when Polaris intends to refresh its known version of a table's
metadata by fetching the
- * latest.
- *
- * @param catalogName The name of the catalog where the view is located.
- * @param tableIdentifier The identifier of the table being refreshed.
- */
-public record BeforeTableRefreshedEvent(String catalogName, TableIdentifier
tableIdentifier)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
deleted file mode 100644
index c9303de71..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.view.ViewMetadata;
-
-/**
- * Emitted when Polaris intends to perform a commit to a view. There is no
guarantee on the order of
- * this event relative to the validation checks we've performed, which means
the commit may still
- * fail Polaris-side validation checks.
- *
- * @param catalogName The name of the catalog where the view is located.
- * @param identifier The identifier.
- * @param base The old metadata.
- * @param metadata The new metadata.
- */
-public record BeforeViewCommitedEvent(
- String catalogName, TableIdentifier identifier, ViewMetadata base,
ViewMetadata metadata)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
deleted file mode 100644
index 32b3250c7..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events;
-
-import org.apache.iceberg.catalog.TableIdentifier;
-
-/**
- * Emitted when Polaris intends to refresh its known version of a view's
metadata by fetching the
- * latest.
- *
- * @param catalogName The name of the catalog where the view is located.
- * @param viewIdentifier The identifier of the view being refreshed.
- */
-public record BeforeViewRefreshedEvent(String catalogName, TableIdentifier
viewIdentifier)
- implements PolarisEvent {}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java
new file mode 100644
index 000000000..76d3cf298
--- /dev/null
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.CreateViewRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.LoadViewResponse;
+import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.polaris.service.types.CommitTableRequest;
+import org.apache.polaris.service.types.CommitViewRequest;
+import org.apache.polaris.service.types.NotificationRequest;
+
+/**
+ * Event records for Iceberg REST Catalog operations. Each operation has
corresponding "Before" and
+ * "After" event records.
+ */
+public class IcebergRestCatalogEvents {
+
+ // Namespace Events
+ public record BeforeCreateNamespaceEvent(
+ String catalogName, CreateNamespaceRequest createNamespaceRequest) {}
+
+ public record AfterCreateNamespaceEvent(
+ String catalogName, Namespace namespace, Map<String, String>
namespaceProperties) {}
+
+ public record BeforeListNamespacesEvent(String catalogName, String parent) {}
+
+ public record AfterListNamespacesEvent(String catalogName, String parent) {}
+
+ public record BeforeLoadNamespaceMetadataEvent(String catalogName, Namespace
namespace) {}
+
+ public record AfterLoadNamespaceMetadataEvent(
+ String catalogName, Namespace namespace, Map<String, String>
namespaceProperties) {}
+
+ public record BeforeCheckExistsNamespaceEvent(String catalogName, Namespace
namespace) {}
+
+ public record AfterCheckExistsNamespaceEvent(String catalogName, Namespace
namespace) {}
+
+ public record BeforeDropNamespaceEvent(String catalogName, Namespace
namespace) {}
+
+ public record AfterDropNamespaceEvent(String catalogName, String namespace)
{}
+
+ public record BeforeUpdateNamespacePropertiesEvent(
+ String catalogName,
+ Namespace namespace,
+ UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {}
+
+ public record AfterUpdateNamespacePropertiesEvent(
+ String catalogName,
+ Namespace namespace,
+ UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse) {}
+
+ // Table Events
+ public record BeforeCreateTableEvent(
+ String catalogName,
+ Namespace namespace,
+ CreateTableRequest createTableRequest,
+ String accessDelegationMode) {}
+
+ public record AfterCreateTableEvent(
+ String catalogName,
+ Namespace namespace,
+ String tableName,
+ LoadTableResponse loadTableResponse) {}
+
+ public record BeforeListTablesEvent(String catalogName, Namespace namespace)
{}
+
+ public record AfterListTablesEvent(String catalogName, Namespace namespace)
{}
+
+ public record BeforeLoadTableEvent(
+ String catalogName,
+ Namespace namespace,
+ String table,
+ String accessDelegationMode,
+ String ifNoneMatchString,
+ String snapshots) {}
+
+ public record AfterLoadTableEvent(
+ String catalogName, Namespace namespace, LoadTableResponse
loadTableResponse) {}
+
+ public record BeforeCheckExistsTableEvent(
+ String catalogName, Namespace namespace, String table) {}
+
+ public record AfterCheckExistsTableEvent(String catalogName, Namespace
namespace, String table) {}
+
+ public record BeforeDropTableEvent(
+ String catalogName, Namespace namespace, String table, Boolean
purgeRequested) {}
+
+ public record AfterDropTableEvent(
+ String catalogName, Namespace namespace, String table, Boolean
purgeRequested) {}
+
+ public record BeforeRegisterTableEvent(
+ String catalogName, Namespace namespace, RegisterTableRequest
registerTableRequest) {}
+
+ public record AfterRegisterTableEvent(
+ String catalogName, Namespace namespace, LoadTableResponse
loadTableResponse) {}
+
+ public record BeforeRenameTableEvent(String catalogName, RenameTableRequest
renameTableRequest) {}
+
+ public record AfterRenameTableEvent(String catalogName, RenameTableRequest
renameTableRequest) {}
+
+ public record BeforeUpdateTableEvent(
+ String catalogName,
+ Namespace namespace,
+ String sourceTable,
+ CommitTableRequest commitTableRequest) {}
+
+ public record AfterUpdateTableEvent(
+ String catalogName,
+ Namespace namespace,
+ String sourceTable,
+ LoadTableResponse loadTableResponse) {}
+
+ // View Events
+ public record BeforeCreateViewEvent(
+ String catalogName, Namespace namespace, CreateViewRequest
createViewRequest) {}
+
+ public record AfterCreateViewEvent(
+ String catalogName, Namespace namespace, LoadViewResponse
loadViewResponse) {}
+
+ public record BeforeListViewsEvent(String catalogName, Namespace namespace)
{}
+
+ public record AfterListViewsEvent(String catalogName, Namespace namespace) {}
+
+ public record BeforeLoadViewEvent(String catalogName, Namespace namespace,
String view) {}
+
+ public record AfterLoadViewEvent(
+ String catalogName, Namespace namespace, LoadViewResponse
loadViewResponse) {}
+
+ public record BeforeCheckExistsViewEvent(String catalogName, Namespace
namespace, String view) {}
+
+ public record AfterCheckExistsViewEvent(String catalogName, Namespace
namespace, String view) {}
+
+ public record BeforeDropViewEvent(String catalogName, Namespace namespace,
String view) {}
+
+ public record AfterDropViewEvent(String catalogName, Namespace namespace,
String view) {}
+
+ public record BeforeRenameViewEvent(String catalogName, RenameTableRequest
renameTableRequest) {}
+
+ public record AfterRenameViewEvent(String catalogName, RenameTableRequest
renameTableRequest) {}
+
+ public record BeforeReplaceViewEvent(
+ String catalogName,
+ Namespace namespace,
+ String sourceView,
+ CommitViewRequest commitViewRequest) {}
+
+ public record AfterReplaceViewEvent(
+ String catalogName,
+ Namespace namespace,
+ String sourceView,
+ LoadViewResponse loadViewResponse) {}
+
+ // Credential Events
+ public record BeforeLoadCredentialsEvent(String catalogName, Namespace
namespace, String table) {}
+
+ public record AfterLoadCredentialsEvent(String catalogName, Namespace
namespace, String table) {}
+
+ // Transaction Events
+ public record BeforeCommitTransactionEvent(
+ String catalogName, CommitTransactionRequest commitTransactionRequest) {}
+
+ // TODO: Add all PolarisEntities that were modified with this transaction.
+ public record AfterCommitTransactionEvent(
+ String catalogName, CommitTransactionRequest commitTransactionRequest) {}
+
+ // Notification Events
+ public record BeforeSendNotificationEvent(
+ String catalogName,
+ Namespace namespace,
+ String table,
+ NotificationRequest notificationRequest) {}
+
+ // TODO: Add result once SendNotification API changes are confirmed to
return the result.
+ public record AfterSendNotificationEvent(String catalogName, Namespace
namespace, String table) {}
+
+ // Configuration Events
+ public record BeforeGetConfigEvent(String warehouse) {}
+
+ public record AfterGetConfigEvent(ConfigResponse configResponse) {}
+
+ // Legacy events
+ public record BeforeCommitTableEvent(
+ String catalogName,
+ TableIdentifier identifier,
+ TableMetadata metadataBefore,
+ TableMetadata metadataAfter)
+ implements PolarisEvent {}
+
+ public record AfterCommitTableEvent(
+ String catalogName,
+ TableIdentifier identifier,
+ TableMetadata metadataBefore,
+ TableMetadata metadataAfter)
+ implements PolarisEvent {}
+
+ public record BeforeCommitViewEvent(
+ String catalogName,
+ TableIdentifier identifier,
+ ViewMetadata metadataBefore,
+ ViewMetadata metadataAfter)
+ implements PolarisEvent {}
+
+ public record AfterCommitViewEvent(
+ String catalogName,
+ TableIdentifier identifier,
+ ViewMetadata metadataBefore,
+ ViewMetadata metadataAfter)
+ implements PolarisEvent {}
+
+ public record BeforeRefreshTableEvent(String catalogName, TableIdentifier
tableIdentifier)
+ implements PolarisEvent {}
+
+ public record AfterRefreshTableEvent(String catalogName, TableIdentifier
tableIdentifier)
+ implements PolarisEvent {}
+
+ public record BeforeRefreshViewEvent(String catalogName, TableIdentifier
viewIdentifier)
+ implements PolarisEvent {}
+
+ public record AfterRefreshViewEvent(String catalogName, TableIdentifier
viewIdentifier)
+ implements PolarisEvent {}
+}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java
index 93eec9926..3d23129e6 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java
@@ -20,7 +20,7 @@
package org.apache.polaris.service.events.jsonEventListener;
import java.util.HashMap;
-import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
/**
@@ -34,7 +34,7 @@ public abstract class PropertyMapEventListener extends
PolarisEventListener {
protected abstract void transformAndSendEvent(HashMap<String, Object>
properties);
@Override
- public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {
+ public void
onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {
HashMap<String, Object> properties = new HashMap<>();
properties.put("event_type", event.getClass().getSimpleName());
properties.put("table_identifier", event.tableIdentifier().toString());
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java
index 5e1122d25..20dc304d0 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java
@@ -18,66 +18,217 @@
*/
package org.apache.polaris.service.events.listeners;
-import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
-import org.apache.polaris.service.events.AfterTableCommitedEvent;
-import org.apache.polaris.service.events.AfterTableCreatedEvent;
-import org.apache.polaris.service.events.AfterTableRefreshedEvent;
-import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
-import org.apache.polaris.service.events.AfterViewCommitedEvent;
-import org.apache.polaris.service.events.AfterViewRefreshedEvent;
-import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
-import org.apache.polaris.service.events.BeforeTableCommitedEvent;
-import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
-import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
-import org.apache.polaris.service.events.BeforeViewCommitedEvent;
-import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.AfterAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeLimitRequestRateEvent;
import org.apache.polaris.service.events.CatalogGenericTableServiceEvents;
import org.apache.polaris.service.events.CatalogPolicyServiceEvents;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
/**
* Represents an event listener that can respond to notable moments during
Polaris's execution.
* Event details are documented under the event objects themselves.
*/
public abstract class PolarisEventListener {
+ /** {@link BeforeLimitRequestRateEvent} */
+ public void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) {}
- /** {@link BeforeRequestRateLimitedEvent} */
- public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event)
{}
+ /** {@link IcebergRestCatalogEvents.BeforeCommitTableEvent} */
+ public void
onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {}
- /** {@link BeforeTableCommitedEvent} */
- public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.AfterCommitTableEvent} */
+ public void
onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {}
- /** {@link AfterTableCommitedEvent} */
- public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.BeforeCommitViewEvent} */
+ public void
onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) {}
- /** {@link BeforeViewCommitedEvent} */
- public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.AfterCommitViewEvent} */
+ public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent
event) {}
- /** {@link AfterViewCommitedEvent} */
- public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.BeforeRefreshTableEvent} */
+ public void
onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {}
- /** {@link BeforeTableRefreshedEvent} */
- public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.AfterRefreshTableEvent} */
+ public void
onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {}
- /** {@link AfterTableRefreshedEvent} */
- public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.BeforeRefreshViewEvent} */
+ public void
onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) {}
- /** {@link BeforeViewRefreshedEvent} */
- public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.AfterRefreshViewEvent} */
+ public void
onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) {}
- /** {@link AfterViewRefreshedEvent} */
- public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+ /** {@link BeforeAttemptTaskEvent} */
+ public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) {}
- /** {@link BeforeTaskAttemptedEvent} */
- public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+ /** {@link AfterAttemptTaskEvent} */
+ public void onAfterAttemptTask(AfterAttemptTaskEvent event) {}
- /** {@link AfterTaskAttemptedEvent} */
- public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+ // Iceberg REST Catalog Namespace Events
+ /** {@link IcebergRestCatalogEvents.BeforeCreateNamespaceEvent} */
+ public void
onBeforeCreateNamespace(IcebergRestCatalogEvents.BeforeCreateNamespaceEvent
event) {}
- /** {@link AfterTableCreatedEvent} */
- public void onAfterTableCreated(AfterTableCreatedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.AfterCreateNamespaceEvent} */
+ public void
onAfterCreateNamespace(IcebergRestCatalogEvents.AfterCreateNamespaceEvent
event) {}
- /** {@link AfterCatalogCreatedEvent} */
- public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {}
+ /** {@link IcebergRestCatalogEvents.BeforeListNamespacesEvent} */
+ public void
onBeforeListNamespaces(IcebergRestCatalogEvents.BeforeListNamespacesEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterListNamespacesEvent} */
+ public void
onAfterListNamespaces(IcebergRestCatalogEvents.AfterListNamespacesEvent event)
{}
+
+ /** {@link IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent} */
+ public void onBeforeLoadNamespaceMetadata(
+ IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent} */
+ public void onAfterLoadNamespaceMetadata(
+ IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent} */
+ public void onBeforeCheckExistsNamespace(
+ IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent} */
+ public void onAfterCheckExistsNamespace(
+ IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeDropNamespaceEvent} */
+ public void
onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEvent event)
{}
+
+ /** {@link IcebergRestCatalogEvents.AfterDropNamespaceEvent} */
+ public void
onAfterDropNamespace(IcebergRestCatalogEvents.AfterDropNamespaceEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent} */
+ public void onBeforeUpdateNamespaceProperties(
+ IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent} */
+ public void onAfterUpdateNamespaceProperties(
+ IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) {}
+
+ // Iceberg REST Catalog Table Events
+ /** {@link IcebergRestCatalogEvents.BeforeCreateTableEvent} */
+ public void
onBeforeCreateTable(IcebergRestCatalogEvents.BeforeCreateTableEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterCreateTableEvent} */
+ public void
onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeListTablesEvent} */
+ public void
onBeforeListTables(IcebergRestCatalogEvents.BeforeListTablesEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterListTablesEvent} */
+ public void onAfterListTables(IcebergRestCatalogEvents.AfterListTablesEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeLoadTableEvent} */
+ public void onBeforeLoadTable(IcebergRestCatalogEvents.BeforeLoadTableEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterLoadTableEvent} */
+ public void onAfterLoadTable(IcebergRestCatalogEvents.AfterLoadTableEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeCheckExistsTableEvent} */
+ public void onBeforeCheckExistsTable(
+ IcebergRestCatalogEvents.BeforeCheckExistsTableEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterCheckExistsTableEvent} */
+ public void
onAfterCheckExistsTable(IcebergRestCatalogEvents.AfterCheckExistsTableEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeDropTableEvent} */
+ public void onBeforeDropTable(IcebergRestCatalogEvents.BeforeDropTableEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterDropTableEvent} */
+ public void onAfterDropTable(IcebergRestCatalogEvents.AfterDropTableEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeRegisterTableEvent} */
+ public void
onBeforeRegisterTable(IcebergRestCatalogEvents.BeforeRegisterTableEvent event)
{}
+
+ /** {@link IcebergRestCatalogEvents.AfterRegisterTableEvent} */
+ public void
onAfterRegisterTable(IcebergRestCatalogEvents.AfterRegisterTableEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeRenameTableEvent} */
+ public void
onBeforeRenameTable(IcebergRestCatalogEvents.BeforeRenameTableEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterRenameTableEvent} */
+ public void
onAfterRenameTable(IcebergRestCatalogEvents.AfterRenameTableEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeUpdateTableEvent} */
+ public void
onBeforeUpdateTable(IcebergRestCatalogEvents.BeforeUpdateTableEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterUpdateTableEvent} */
+ public void
onAfterUpdateTable(IcebergRestCatalogEvents.AfterUpdateTableEvent event) {}
+
+ // Iceberg REST Catalog View Events
+ /** {@link IcebergRestCatalogEvents.BeforeCreateViewEvent} */
+ public void
onBeforeCreateView(IcebergRestCatalogEvents.BeforeCreateViewEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterCreateViewEvent} */
+ public void onAfterCreateView(IcebergRestCatalogEvents.AfterCreateViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeListViewsEvent} */
+ public void onBeforeListViews(IcebergRestCatalogEvents.BeforeListViewsEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterListViewsEvent} */
+ public void onAfterListViews(IcebergRestCatalogEvents.AfterListViewsEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeLoadViewEvent} */
+ public void onBeforeLoadView(IcebergRestCatalogEvents.BeforeLoadViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterLoadViewEvent} */
+ public void onAfterLoadView(IcebergRestCatalogEvents.AfterLoadViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeCheckExistsViewEvent} */
+ public void
onBeforeCheckExistsView(IcebergRestCatalogEvents.BeforeCheckExistsViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterCheckExistsViewEvent} */
+ public void
onAfterCheckExistsView(IcebergRestCatalogEvents.AfterCheckExistsViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeDropViewEvent} */
+ public void onBeforeDropView(IcebergRestCatalogEvents.BeforeDropViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterDropViewEvent} */
+ public void onAfterDropView(IcebergRestCatalogEvents.AfterDropViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeRenameViewEvent} */
+ public void
onBeforeRenameView(IcebergRestCatalogEvents.BeforeRenameViewEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterRenameViewEvent} */
+ public void onAfterRenameView(IcebergRestCatalogEvents.AfterRenameViewEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.BeforeReplaceViewEvent} */
+ public void
onBeforeReplaceView(IcebergRestCatalogEvents.BeforeReplaceViewEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterReplaceViewEvent} */
+ public void
onAfterReplaceView(IcebergRestCatalogEvents.AfterReplaceViewEvent event) {}
+
+ // Iceberg REST Catalog Credential Events
+ /** {@link IcebergRestCatalogEvents.BeforeLoadCredentialsEvent} */
+ public void
onBeforeLoadCredentials(IcebergRestCatalogEvents.BeforeLoadCredentialsEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterLoadCredentialsEvent} */
+ public void
onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentialsEvent
event) {}
+
+ // Iceberg REST Catalog Transactions Events
+ /** {@link IcebergRestCatalogEvents.BeforeCommitTransactionEvent} */
+ public void onBeforeCommitTransaction(
+ IcebergRestCatalogEvents.BeforeCommitTransactionEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterCommitTransactionEvent} */
+ public void onAfterCommitTransaction(
+ IcebergRestCatalogEvents.AfterCommitTransactionEvent event) {}
+
+ // Iceberg REST Catalog Notification Events
+ /** {@link IcebergRestCatalogEvents.BeforeSendNotificationEvent} */
+ public void onBeforeSendNotification(
+ IcebergRestCatalogEvents.BeforeSendNotificationEvent event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterSendNotificationEvent} */
+ public void
onAfterSendNotification(IcebergRestCatalogEvents.AfterSendNotificationEvent
event) {}
+
+ // Iceberg REST Catalog Configuration Events
+ /** {@link IcebergRestCatalogEvents.BeforeGetConfigEvent} */
+ public void onBeforeGetConfig(IcebergRestCatalogEvents.BeforeGetConfigEvent
event) {}
+
+ /** {@link IcebergRestCatalogEvents.AfterGetConfigEvent} */
+ public void onAfterGetConfig(IcebergRestCatalogEvents.AfterGetConfigEvent
event) {}
// Catalog Policy Service Events
/** {@link CatalogPolicyServiceEvents.BeforeCreatePolicyEvent} */
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
index 4b30e8fa3..e9d43f003 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
@@ -21,61 +21,55 @@ package org.apache.polaris.service.events.listeners;
import jakarta.annotation.Nullable;
import java.util.Map;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.entity.PolarisEvent;
-import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
-import org.apache.polaris.service.events.AfterTableCommitedEvent;
-import org.apache.polaris.service.events.AfterTableCreatedEvent;
-import org.apache.polaris.service.events.AfterTableRefreshedEvent;
-import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
-import org.apache.polaris.service.events.AfterViewCommitedEvent;
-import org.apache.polaris.service.events.AfterViewRefreshedEvent;
-import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
-import org.apache.polaris.service.events.BeforeTableCommitedEvent;
-import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
-import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
-import org.apache.polaris.service.events.BeforeViewCommitedEvent;
-import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.AfterAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeLimitRequestRateEvent;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
public abstract class PolarisPersistenceEventListener extends
PolarisEventListener {
- // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer`
+ // TODO: Ensure all events (except RateLimiter ones) call `processEvent`
@Override
- public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent
event) {}
+ public final void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent
event) {}
@Override
- public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+ public void
onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {}
@Override
- public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+ public void
onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {}
@Override
- public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+ public void
onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) {}
@Override
- public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+ public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent
event) {}
@Override
- public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+ public void
onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {}
@Override
- public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+ public void
onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {}
@Override
- public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+ public void
onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) {}
@Override
- public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+ public void
onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) {}
@Override
- public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+ public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) {}
@Override
- public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+ public void onAfterAttemptTask(AfterAttemptTaskEvent event) {}
@Override
- public void onAfterTableCreated(AfterTableCreatedEvent event) {
+ public void
onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) {
ContextSpecificInformation contextSpecificInformation =
getContextSpecificInformation();
+ TableMetadata tableMetadata = event.loadTableResponse().tableMetadata();
PolarisEvent polarisEvent =
new PolarisEvent(
event.catalogName(),
@@ -85,33 +79,17 @@ public abstract class PolarisPersistenceEventListener
extends PolarisEventListen
contextSpecificInformation.timestamp(),
contextSpecificInformation.principalName(),
PolarisEvent.ResourceType.TABLE,
- event.identifier().toString());
+ TableIdentifier.of(event.namespace(),
event.tableName()).toString());
Map<String, String> additionalParameters =
Map.of(
"table-uuid",
- event.metadata().uuid(),
+ tableMetadata.uuid(),
"metadata",
- TableMetadataParser.toJson(event.metadata()));
+ TableMetadataParser.toJson(tableMetadata));
polarisEvent.setAdditionalProperties(additionalParameters);
processEvent(polarisEvent);
}
- @Override
- public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {
- ContextSpecificInformation contextSpecificInformation =
getContextSpecificInformation();
- PolarisEvent polarisEvent =
- new PolarisEvent(
- event.catalogName(),
- org.apache.polaris.service.events.PolarisEvent.createEventId(),
- getRequestId(),
- event.getClass().getSimpleName(),
- contextSpecificInformation.timestamp(),
- contextSpecificInformation.principalName(),
- PolarisEvent.ResourceType.CATALOG,
- event.catalogName());
- processEvent(polarisEvent);
- }
-
protected record ContextSpecificInformation(long timestamp, @Nullable String
principalName) {}
abstract ContextSpecificInformation getContextSpecificInformation();
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java
index cf95452de..d0b9b5f92 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java
@@ -23,17 +23,10 @@ import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.ArrayList;
import java.util.List;
-import org.apache.polaris.service.events.AfterTableCommitedEvent;
-import org.apache.polaris.service.events.AfterTableRefreshedEvent;
-import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
-import org.apache.polaris.service.events.AfterViewCommitedEvent;
-import org.apache.polaris.service.events.AfterViewRefreshedEvent;
-import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
-import org.apache.polaris.service.events.BeforeTableCommitedEvent;
-import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
-import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
-import org.apache.polaris.service.events.BeforeViewCommitedEvent;
-import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.AfterAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeLimitRequestRateEvent;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
import org.apache.polaris.service.events.PolarisEvent;
/** Event listener that stores all emitted events forever. Not recommended for
use in production. */
@@ -48,57 +41,57 @@ public class TestPolarisEventListener extends
PolarisEventListener {
}
@Override
- public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {
+ public void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) {
history.add(event);
}
@Override
- public void onBeforeTableCommited(BeforeTableCommitedEvent event) {
+ public void
onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {
history.add(event);
}
@Override
- public void onAfterTableCommited(AfterTableCommitedEvent event) {
+ public void
onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {
history.add(event);
}
@Override
- public void onBeforeViewCommited(BeforeViewCommitedEvent event) {
+ public void
onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) {
history.add(event);
}
@Override
- public void onAfterViewCommited(AfterViewCommitedEvent event) {
+ public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent
event) {
history.add(event);
}
@Override
- public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {
+ public void
onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {
history.add(event);
}
@Override
- public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {
+ public void
onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {
history.add(event);
}
@Override
- public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {
+ public void
onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) {
history.add(event);
}
@Override
- public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {
+ public void
onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) {
history.add(event);
}
@Override
- public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {
+ public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) {
history.add(event);
}
@Override
- public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {
+ public void onAfterAttemptTask(AfterAttemptTaskEvent event) {
history.add(event);
}
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
index 61d27a040..6ea95db4e 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
@@ -28,7 +28,7 @@ import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.Provider;
import java.io.IOException;
import org.apache.polaris.service.config.FilterPriorities;
-import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeLimitRequestRateEvent;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +54,8 @@ public class RateLimiterFilter implements
ContainerRequestFilter {
@Override
public void filter(ContainerRequestContext ctx) throws IOException {
if (!rateLimiter.canProceed()) {
- polarisEventListener.onBeforeRequestRateLimited(
- new BeforeRequestRateLimitedEvent(
+ polarisEventListener.onBeforeLimitRequestRate(
+ new BeforeLimitRequestRateEvent(
ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString()));
ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build());
LOGGER.atDebug().log("Rate limiting request");
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
index d425c1675..55847b8eb 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
@@ -43,8 +43,8 @@ import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
-import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.tracing.TracingFilter;
import org.slf4j.Logger;
@@ -143,7 +143,7 @@ public class TaskExecutorImpl implements TaskExecutor {
}
protected void handleTask(long taskEntityId, CallContext ctx, int attempt) {
- polarisEventListener.onBeforeTaskAttempted(new
BeforeTaskAttemptedEvent(taskEntityId, attempt));
+ polarisEventListener.onBeforeAttemptTask(new
BeforeAttemptTaskEvent(taskEntityId, attempt));
boolean success = false;
try {
@@ -186,8 +186,8 @@ public class TaskExecutorImpl implements TaskExecutor {
.log("Unable to execute async task");
}
} finally {
- polarisEventListener.onAfterTaskAttempted(
- new AfterTaskAttemptedEvent(taskEntityId, attempt, success));
+ polarisEventListener.onAfterAttemptTask(
+ new AfterAttemptTaskEvent(taskEntityId, attempt, success));
}
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
index 1ec9d87ba..3d66c4858 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
@@ -137,10 +137,7 @@ import
org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.events.AfterTableCommitedEvent;
-import org.apache.polaris.service.events.AfterTableRefreshedEvent;
-import org.apache.polaris.service.events.BeforeTableCommitedEvent;
-import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.events.listeners.TestPolarisEventListener;
import org.apache.polaris.service.exception.FakeAzureHttpResponse;
@@ -2229,22 +2226,26 @@ public abstract class AbstractIcebergCatalogTest
extends CatalogTests<IcebergCat
table.updateProperties().set(key, valOld).commit();
table.updateProperties().set(key, valNew).commit();
- var beforeRefreshEvent =
testPolarisEventListener.getLatest(BeforeTableRefreshedEvent.class);
+ var beforeRefreshEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeRefreshTableEvent.class);
Assertions.assertThat(beforeRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE);
- var afterRefreshEvent =
testPolarisEventListener.getLatest(AfterTableRefreshedEvent.class);
+ var afterRefreshEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterRefreshTableEvent.class);
Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE);
- var beforeTableEvent =
testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class);
-
Assertions.assertThat(beforeTableEvent.tableIdentifier()).isEqualTo(TestData.TABLE);
+ var beforeTableEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeCommitTableEvent.class);
+
Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE);
Assertions.assertThat(beforeTableEvent.metadataBefore().properties().get(key))
.isEqualTo(valOld);
Assertions.assertThat(beforeTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew);
- var afterTableEvent =
testPolarisEventListener.getLatest(AfterTableCommitedEvent.class);
+ var afterTableEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterCommitTableEvent.class);
Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE);
-
Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld);
-
Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew);
+
Assertions.assertThat(afterTableEvent.metadataBefore().properties().get(key)).isEqualTo(valOld);
+
Assertions.assertThat(afterTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew);
}
private static PageToken nextRequest(Page<?> previousPage) {
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
index f0e54ea3d..2514dc664 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
@@ -62,10 +62,7 @@ import org.apache.polaris.service.catalog.Profiles;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.events.AfterViewCommitedEvent;
-import org.apache.polaris.service.events.AfterViewRefreshedEvent;
-import org.apache.polaris.service.events.BeforeViewCommitedEvent;
-import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.events.listeners.TestPolarisEventListener;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
@@ -270,20 +267,27 @@ public abstract class AbstractIcebergCatalogViewTest
extends ViewCatalogTests<Ic
view.updateProperties().set(key, valOld).commit();
view.updateProperties().set(key, valNew).commit();
- var beforeRefreshEvent =
testPolarisEventListener.getLatest(BeforeViewRefreshedEvent.class);
+ var beforeRefreshEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeRefreshViewEvent.class);
Assertions.assertThat(beforeRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE);
- var afterRefreshEvent =
testPolarisEventListener.getLatest(AfterViewRefreshedEvent.class);
+ var afterRefreshEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterRefreshViewEvent.class);
Assertions.assertThat(afterRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE);
- var beforeCommitEvent =
testPolarisEventListener.getLatest(BeforeViewCommitedEvent.class);
+ var beforeCommitEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeCommitViewEvent.class);
Assertions.assertThat(beforeCommitEvent.identifier()).isEqualTo(TestData.TABLE);
-
Assertions.assertThat(beforeCommitEvent.base().properties().get(key)).isEqualTo(valOld);
-
Assertions.assertThat(beforeCommitEvent.metadata().properties().get(key)).isEqualTo(valNew);
+
Assertions.assertThat(beforeCommitEvent.metadataBefore().properties().get(key))
+ .isEqualTo(valOld);
+
Assertions.assertThat(beforeCommitEvent.metadataAfter().properties().get(key))
+ .isEqualTo(valNew);
- var afterCommitEvent =
testPolarisEventListener.getLatest(AfterViewCommitedEvent.class);
+ var afterCommitEvent =
+
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterCommitViewEvent.class);
Assertions.assertThat(afterCommitEvent.identifier()).isEqualTo(TestData.TABLE);
-
Assertions.assertThat(afterCommitEvent.base().properties().get(key)).isEqualTo(valOld);
-
Assertions.assertThat(afterCommitEvent.metadata().properties().get(key)).isEqualTo(valNew);
+
Assertions.assertThat(afterCommitEvent.metadataBefore().properties().get(key))
+ .isEqualTo(valOld);
+
Assertions.assertThat(afterCommitEvent.metadataAfter().properties().get(key)).isEqualTo(valNew);
}
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
index 88a8ad44b..ffc157b9d 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
@@ -40,7 +40,7 @@ import org.apache.polaris.core.auth.PolarisPrincipal;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer;
-import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -203,8 +203,8 @@ class AwsCloudWatchEventListenerTest {
try {
// Create and send a test event
TableIdentifier testTable = TableIdentifier.of("test_namespace",
"test_table");
- AfterTableRefreshedEvent event = new
AfterTableRefreshedEvent("test_catalog", testTable);
- listener.onAfterTableRefreshed(event);
+ listener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog",
testTable));
Awaitility.await("expected amount of records should be sent to
CloudWatch")
.atMost(Duration.ofSeconds(30))
@@ -238,7 +238,9 @@ class AwsCloudWatchEventListenerTest {
logEvent -> {
String message = logEvent.message();
assertThat(message).contains(REALM);
-
assertThat(message).contains(AfterTableRefreshedEvent.class.getSimpleName());
+ assertThat(message)
+ .contains(
+
IcebergRestCatalogEvents.AfterRefreshTableEvent.class.getSimpleName());
assertThat(message).contains(TEST_USER);
assertThat(message).contains(testTable.toString());
});
@@ -260,9 +262,8 @@ class AwsCloudWatchEventListenerTest {
try {
// Create and send a test event synchronously
TableIdentifier syncTestTable = TableIdentifier.of("test_namespace",
"test_table_sync");
- AfterTableRefreshedEvent syncEvent =
- new AfterTableRefreshedEvent("test_catalog", syncTestTable);
- syncListener.onAfterTableRefreshed(syncEvent);
+ syncListener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog",
syncTestTable));
Awaitility.await("expected amount of records should be sent to
CloudWatch")
.atMost(Duration.ofSeconds(30))
@@ -298,7 +299,7 @@ class AwsCloudWatchEventListenerTest {
logEvent -> {
String message = logEvent.message();
assertThat(message).contains("test_table_sync");
- assertThat(message).contains("AfterTableRefreshedEvent");
+ assertThat(message).contains("AfterRefreshTableEvent");
});
} finally {
// Clean up
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
index d0c2a34f1..4d5c92559 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
@@ -32,7 +32,7 @@ import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
-import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeLimitRequestRateEvent;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.events.listeners.TestPolarisEventListener;
import org.apache.polaris.service.ratelimiter.RateLimiterFilterTest.Profile;
@@ -148,9 +148,9 @@ public class RateLimiterFilterTest {
}
requestAsserter.accept(Status.TOO_MANY_REQUESTS);
- BeforeRequestRateLimitedEvent event =
+ BeforeLimitRequestRateEvent event =
((TestPolarisEventListener) polarisEventListener)
- .getLatest(BeforeRequestRateLimitedEvent.class);
+ .getLatest(BeforeLimitRequestRateEvent.class);
assertThat(event.method()).isEqualTo("GET");
// Examples of expected metrics:
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
index 56cb22650..3d59c481a 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
@@ -24,8 +24,8 @@ import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.service.TestServices;
-import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
-import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
import org.apache.polaris.service.events.listeners.TestPolarisEventListener;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -77,7 +77,7 @@ public class TaskExecutorImplTest {
@Override
public boolean handleTask(TaskEntity task, CallContext callContext) {
var beforeTaskAttemptedEvent =
-
testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class);
+
testPolarisEventListener.getLatest(BeforeAttemptTaskEvent.class);
Assertions.assertEquals(taskEntity.getId(),
beforeTaskAttemptedEvent.taskEntityId());
Assertions.assertEquals(attempt,
beforeTaskAttemptedEvent.attempt());
return true;
@@ -86,8 +86,7 @@ public class TaskExecutorImplTest {
executor.handleTask(taskEntity.getId(), polarisCallCtx, attempt);
- var afterAttemptTaskEvent =
testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class);
-
+ var afterAttemptTaskEvent =
testPolarisEventListener.getLatest(AfterAttemptTaskEvent.class);
Assertions.assertEquals(taskEntity.getId(),
afterAttemptTaskEvent.taskEntityId());
Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt());
Assertions.assertTrue(afterAttemptTaskEvent.success());