This is an automated email from the ASF dual-hosted git repository.
dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new a29f8006f Fix TableIdentifier in TaskFileIOSupplier (#2304)
a29f8006f is described below
commit a29f8006fe9d259df755d02ec6386c2bd6932610
Author: Christopher Lambert <[email protected]>
AuthorDate: Fri Aug 22 21:54:42 2025 +0200
Fix TableIdentifier in TaskFileIOSupplier (#2304)
we cant just convert a `TaskEntity` to a `IcebergTableLikeEntity` as the
`getTableIdentifier()` method will not return a correct value by using
the name of the task and its parent namespace (which is empty?).
task handlers instead need to pass in the `TableIdentifier` that they
already inferred via `TaskEntity.readData`.
---
.../service/task/BatchFileCleanupTaskHandler.java | 2 +-
.../task/ManifestFileCleanupTaskHandler.java | 2 +-
.../service/task/TableCleanupTaskHandler.java | 25 ++++++++++++----------
.../polaris/service/task/TaskFileIOSupplier.java | 10 ++++-----
.../catalog/AbstractIcebergCatalogTest.java | 2 +-
.../service/catalog/io/FileIOFactoryTest.java | 2 +-
6 files changed, 22 insertions(+), 21 deletions(-)
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
index d47725351..fdb4ef5e0 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
@@ -53,7 +53,7 @@ public class BatchFileCleanupTaskHandler extends
FileCleanupTaskHandler {
BatchFileCleanupTask cleanupTask =
task.readData(BatchFileCleanupTask.class);
TableIdentifier tableId = cleanupTask.tableId();
List<String> batchFiles = cleanupTask.batchFiles();
- try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
+ try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId,
callContext)) {
List<String> validFiles =
batchFiles.stream().filter(file -> TaskUtils.exists(file,
authorizedFileIO)).toList();
if (validFiles.isEmpty()) {
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
index 3173dc25e..59e9be8dd 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
@@ -63,7 +63,7 @@ public class ManifestFileCleanupTaskHandler extends
FileCleanupTaskHandler {
public boolean handleTask(TaskEntity task, CallContext callContext) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
TableIdentifier tableId = cleanupTask.tableId();
- try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
+ try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId,
callContext)) {
ManifestFile manifestFile =
decodeManifestData(cleanupTask.manifestFileData());
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
index 20bce48d4..db8b335ba 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
@@ -21,6 +21,7 @@ package org.apache.polaris.service.task;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -36,7 +37,6 @@ import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.AsyncTaskType;
import org.apache.polaris.core.entity.PolarisBaseEntity;
-import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
@@ -71,21 +71,19 @@ public class TableCleanupTaskHandler implements TaskHandler
{
@Override
public boolean canHandleTask(TaskEntity task) {
- return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER &&
taskEntityIsTable(task);
+ return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER
+ && tryGetTableEntity(task).isPresent();
}
- private boolean taskEntityIsTable(TaskEntity task) {
- PolarisEntity entity =
PolarisEntity.of((task.readData(PolarisBaseEntity.class)));
- return entity.getType().equals(PolarisEntityType.TABLE_LIKE);
+ private Optional<IcebergTableLikeEntity> tryGetTableEntity(TaskEntity task) {
+ return Optional.ofNullable(task.readData(PolarisBaseEntity.class))
+ .filter(entity ->
entity.getType().equals(PolarisEntityType.TABLE_LIKE))
+ .map(IcebergTableLikeEntity::of);
}
@Override
public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
- PolarisBaseEntity entity = cleanupTask.readData(PolarisBaseEntity.class);
- PolarisMetaStoreManager metaStoreManager =
-
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
- IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(entity);
- PolarisCallContext polarisCallContext =
callContext.getPolarisCallContext();
+ IcebergTableLikeEntity tableEntity =
tryGetTableEntity(cleanupTask).orElseThrow();
LOGGER
.atInfo()
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
@@ -95,7 +93,8 @@ public class TableCleanupTaskHandler implements TaskHandler {
// It's likely the cleanupTask has already been completed, but wasn't
dropped successfully.
// Log a
// warning and move on
- try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) {
+ try (FileIO fileIO =
+ fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier(),
callContext)) {
if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) {
LOGGER
.atWarn()
@@ -108,6 +107,10 @@ public class TableCleanupTaskHandler implements
TaskHandler {
TableMetadata tableMetadata =
TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation());
+ PolarisMetaStoreManager metaStoreManager =
+
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
+ PolarisCallContext polarisCallContext =
callContext.getPolarisCallContext();
+
Stream<TaskEntity> manifestCleanupTasks =
getManifestTaskStream(
cleanupTask,
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
index e3a1ddd48..44de36210 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
@@ -24,21 +24,21 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
+import org.apache.commons.lang3.function.TriFunction;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.PolarisTaskConstants;
import org.apache.polaris.core.entity.TaskEntity;
-import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.service.catalog.io.FileIOFactory;
@ApplicationScoped
-public class TaskFileIOSupplier implements BiFunction<TaskEntity, CallContext,
FileIO> {
+public class TaskFileIOSupplier
+ implements TriFunction<TaskEntity, TableIdentifier, CallContext, FileIO> {
private final FileIOFactory fileIOFactory;
@Inject
@@ -47,12 +47,10 @@ public class TaskFileIOSupplier implements
BiFunction<TaskEntity, CallContext, F
}
@Override
- public FileIO apply(TaskEntity task, CallContext callContext) {
+ public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext
callContext) {
Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
Map<String, String> properties = new HashMap<>(internalProperties);
- IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(task);
- TableIdentifier identifier = tableEntity.getTableIdentifier();
String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION);
Set<String> locations = Set.of(location);
Set<PolarisStorageActions> storageActions =
Set.of(PolarisStorageActions.ALL);
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
index d10f82057..696bca432 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
@@ -1840,7 +1840,7 @@ public abstract class AbstractIcebergCatalogTest extends
CatalogTests<IcebergCat
FileIO fileIO =
new TaskFileIOSupplier(
new DefaultFileIOFactory(storageCredentialCache,
metaStoreManagerFactory))
- .apply(taskEntity, polarisContext);
+ .apply(taskEntity, TABLE, polarisContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
.isInstanceOf(InMemoryFileIO.class);
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index fd6e79f12..5c72f1506 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -176,7 +176,7 @@ public class FileIOFactoryTest {
Assertions.assertThat(tasks).hasSize(1);
TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
FileIO fileIO =
- new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity,
callContext);
+ new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity,
TABLE, callContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
.isInstanceOf(InMemoryFileIO.class);