danielhumanmod commented on code in PR #312:
URL: https://github.com/apache/polaris/pull/312#discussion_r1841643389
##########
polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -154,15 +135,118 @@ public boolean handleTask(TaskEntity cleanupTask) {
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("metadataLocation", tableEntity.getMetadataLocation())
.addKeyValue("taskCount", taskEntities.size())
- .log("Successfully queued tasks to delete manifests - deleting
table metadata file");
+ .log(
+ "Successfully queued tasks to delete manifests, previous
metadata, and statistics files - deleting table metadata file");
for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(),
CallContext.getCurrentContext());
}
+
fileIO.deleteFile(tableEntity.getMetadataLocation());
return true;
}
}
return false;
}
+
+ private Stream<TaskEntity> getManifestTaskStream(
+ TaskEntity cleanupTask,
+ TableMetadata tableMetadata,
+ FileIO fileIO,
+ TableLikeEntity tableEntity,
+ PolarisMetaStoreManager metaStoreManager,
+ PolarisCallContext polarisCallContext) {
+ // read the manifest list for each snapshot. dedupe the manifest files and
schedule a
+ // cleanupTask
+ // for each manifest file and its data files to be deleted
+ return tableMetadata.snapshots().stream()
+ .flatMap(sn -> sn.allManifests(fileIO).stream())
+ // distinct by manifest path, since multiple snapshots will contain
the same
+ // manifest
+ .collect(Collectors.toMap(ManifestFile::path, Function.identity(),
(mf1, mf2) -> mf1))
+ .values()
+ .stream()
+ .filter(mf -> TaskUtils.exists(mf.path(), fileIO))
+ .map(
+ mf -> {
+ // append a random uuid to the task name to avoid any potential
conflict
+ // when
+ // storing the task entity. It's better to have duplicate tasks
than to risk
+ // not storing the rest of the task entities. If a duplicate
deletion task
+ // is
+ // queued, it will check for the manifest file's existence and
simply exit
+ // if
+ // the task has already been handled.
+ String taskName = cleanupTask.getName() + "_" + mf.path() + "_"
+ UUID.randomUUID();
+ LOGGER
+ .atDebug()
+ .addKeyValue("taskName", taskName)
+ .addKeyValue("tableIdentifier",
tableEntity.getTableIdentifier())
+ .addKeyValue("metadataLocation",
tableEntity.getMetadataLocation())
+ .addKeyValue("manifestFile", mf.path())
+ .log("Queueing task to delete manifest file");
+ return new TaskEntity.Builder()
+ .setName(taskName)
+
.setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId())
+ .setCreateTimestamp(polarisCallContext.getClock().millis())
+ .withTaskType(AsyncTaskType.FILE_CLEANUP)
+ .withData(
+ new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
+ tableEntity.getTableIdentifier(),
TaskUtils.encodeManifestFile(mf)))
+
.setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId())
+ // copy the internal properties, which will have storage info
+
.setInternalProperties(cleanupTask.getInternalPropertiesAsMap())
+ .build();
+ });
+ }
+
Review Comment:
The code blow this line is the new changes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]