This is an automated email from the ASF dual-hosted git repository.
snazy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new df8442296 NoSQL: Add maintenance implementation (#3077)
df8442296 is described below
commit df84422960d2ee61231c6870bcc3d805db3305a1
Author: Robert Stupp <[email protected]>
AuthorDate: Thu Nov 20 16:10:35 2025 +0100
NoSQL: Add maintenance implementation (#3077)
---
bom/build.gradle.kts | 1 +
gradle/projects.main.properties | 1 +
.../persistence/maintenance/impl/build.gradle.kts | 83 +++
.../impl/AbstractScanItemStatsCollector.java | 113 ++++
.../nosql/maintenance/impl/AllRetained.java | 105 ++++
.../nosql/maintenance/impl/MaintenanceRunObj.java | 67 +++
.../nosql/maintenance/impl/MaintenanceRunsObj.java | 65 +++
.../maintenance/impl/MaintenanceServiceImpl.java | 617 +++++++++++++++++++++
.../MaintenanceServiceRealmRetainedIdentifier.java | 57 ++
.../nosql/maintenance/impl/RateLimit.java | 53 ++
.../maintenance/impl/RetainedCollectorImpl.java | 363 ++++++++++++
.../nosql/maintenance/impl/ScanHandler.java | 201 +++++++
.../nosql/maintenance/impl/ScanItemCallback.java | 26 +
.../nosql/maintenance/impl/ScanItemOutcome.java | 34 ++
.../nosql/maintenance/impl/package-info.java | 28 +
.../impl/src/main/resources/META-INF/beans.xml | 24 +
...pache.polaris.persistence.nosql.api.obj.ObjType | 21 +
.../nosql/maintenance/impl/TestMaintenance.java | 393 +++++++++++++
.../impl/src/test/resources/weld.properties | 21 +
.../impl/MaintenanceConfigurationProducer.java | 33 ++
.../maintenance/impl/MutableMaintenanceConfig.java | 102 ++++
.../persistence/nosql/maintenance/impl/ObjOne.java | 53 ++
.../persistence/nosql/maintenance/impl/ObjTwo.java | 53 ++
.../nosql/maintenance/impl/ObjTypeIdentOne.java | 51 ++
.../nosql/maintenance/impl/ObjTypeIdentTwo.java | 50 ++
.../nosql/maintenance/impl/RealmIdentOne.java | 40 ++
.../nosql/maintenance/impl/RealmIdentTwo.java | 40 ++
.../nosql/maintenance/impl/package-info.java | 20 +
.../src/testFixtures/resources/META-INF/beans.xml | 24 +
...pache.polaris.persistence.nosql.api.obj.ObjType | 21 +
30 files changed, 2760 insertions(+)
diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index ae4ff4378..83faf336b 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -67,6 +67,7 @@ dependencies {
api(project(":polaris-persistence-nosql-mongodb"))
api(project(":polaris-persistence-nosql-maintenance-api"))
+ api(project(":polaris-persistence-nosql-maintenance-impl"))
api(project(":polaris-persistence-nosql-maintenance-cel"))
api(project(":polaris-persistence-nosql-maintenance-spi"))
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 244e647cd..d6ecdb830 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -80,6 +80,7 @@
polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextens
polaris-persistence-nosql-varint=persistence/nosql/persistence/varint
# persistence / maintenance
polaris-persistence-nosql-maintenance-api=persistence/nosql/persistence/maintenance/api
+polaris-persistence-nosql-maintenance-impl=persistence/nosql/persistence/maintenance/impl
polaris-persistence-nosql-maintenance-cel=persistence/nosql/persistence/maintenance/retain-cel
polaris-persistence-nosql-maintenance-spi=persistence/nosql/persistence/maintenance/spi
# persistence / database specific implementations
diff --git a/persistence/nosql/persistence/maintenance/impl/build.gradle.kts
b/persistence/nosql/persistence/maintenance/impl/build.gradle.kts
new file mode 100644
index 000000000..058a0483d
--- /dev/null
+++ b/persistence/nosql/persistence/maintenance/impl/build.gradle.kts
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("org.kordamp.gradle.jandex")
+ id("polaris-server")
+}
+
+description = "Polaris NoSQL persistence maintenance - service implementation"
+
+dependencies {
+ implementation(project(":polaris-persistence-nosql-api"))
+ implementation(project(":polaris-persistence-nosql-maintenance-api"))
+ implementation(project(":polaris-persistence-nosql-maintenance-spi"))
+ implementation(project(":polaris-persistence-nosql-realms-api"))
+ implementation(project(":polaris-idgen-api"))
+ runtimeOnly(project(":polaris-persistence-nosql-realms-impl"))
+ runtimeOnly(project(":polaris-persistence-nosql-realms-store-nosql"))
+
+ implementation(libs.guava)
+ implementation(libs.slf4j.api)
+
+ compileOnly(platform(libs.jackson.bom))
+ compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+ compileOnly("com.fasterxml.jackson.core:jackson-databind")
+
+ compileOnly(libs.jakarta.annotation.api)
+ compileOnly(libs.jakarta.validation.api)
+ compileOnly(libs.jakarta.inject.api)
+ compileOnly(libs.jakarta.enterprise.cdi.api)
+
+ compileOnly(project(":polaris-immutables"))
+ annotationProcessor(project(":polaris-immutables", configuration =
"processor"))
+
+ testFixturesApi(project(":polaris-persistence-nosql-api"))
+ testFixturesApi(project(":polaris-persistence-nosql-maintenance-api"))
+ testFixturesApi(project(":polaris-persistence-nosql-maintenance-spi"))
+ testFixturesApi(project(":polaris-persistence-nosql-testextension"))
+
+ testFixturesCompileOnly(project(":polaris-immutables"))
+ testFixturesAnnotationProcessor(project(":polaris-immutables", configuration
= "processor"))
+
+ testFixturesCompileOnly(platform(libs.jackson.bom))
+ testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
+ testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind")
+
+ testFixturesImplementation(libs.jakarta.annotation.api)
+ testFixturesImplementation(libs.jakarta.validation.api)
+ testFixturesCompileOnly(libs.jakarta.enterprise.cdi.api)
+
+ testCompileOnly(platform(libs.jackson.bom))
+ testCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
+ testCompileOnly("com.fasterxml.jackson.core:jackson-databind")
+
+ testRuntimeOnly(libs.logback.classic)
+
+ testImplementation(project(":polaris-idgen-mocks"))
+ testRuntimeOnly(testFixtures(project(":polaris-persistence-nosql-cdi-weld")))
+ testImplementation(libs.weld.se.core)
+ testImplementation(libs.weld.junit5)
+ testRuntimeOnly(libs.smallrye.jandex)
+
+ testRuntimeOnly(project(":polaris-persistence-nosql-realms-impl"))
+ testRuntimeOnly(project(":polaris-persistence-nosql-realms-store-nosql"))
+ testRuntimeOnly(project(":polaris-persistence-nosql-inmemory"))
+
testImplementation(testFixtures(project(":polaris-persistence-nosql-inmemory")))
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java
new file mode 100644
index 000000000..5fde8273b
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation.MaintenanceStats;
+
+abstract class AbstractScanItemStatsCollector<I> implements
ScanItemCallback<I> {
+ final StatsHolder stats = new StatsHolder();
+
+ /** Collect maintenance-run stats for objects per realm. */
+ static final class ScanRefStatsCollector extends
AbstractScanItemStatsCollector<String> {
+ final Map<String, StatsHolder> perRealm = new HashMap<>();
+
+ /** Handles the maintenance-run outcome for a reference in a realm. */
+ @Override
+ public void itemOutcome(
+ @Nonnull String realm, @Nonnull String ref, @Nonnull ScanItemOutcome
outcome) {
+ stats.add(outcome);
+ perRealm.computeIfAbsent(realm, realmId -> new
StatsHolder()).add(outcome);
+ }
+
+ /** Retrieve maintenance-run reference stats per realm. */
+ Map<String, ? extends MaintenanceStats> toRealmObjTypeStatsMap() {
+ return perRealm.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, r ->
r.getValue().toMaintenanceStats()));
+ }
+ }
+
+ /**
+ * Collect maintenance-run stats for objects per realm and {@linkplain
+ * org.apache.polaris.persistence.nosql.api.obj.ObjType object type}.
+ */
+ static final class ScanObjStatsCollector extends
AbstractScanItemStatsCollector<ObjRef> {
+ final Map<String, Map<String, StatsHolder>> perRealmAndObjType = new
HashMap<>();
+
+ /** Handles the maintenance-run outcome for an object in a realm. */
+ @Override
+ public void itemOutcome(
+ @Nonnull String realm, @Nonnull ObjRef id, @Nonnull ScanItemOutcome
outcome) {
+ stats.add(outcome);
+ perRealmAndObjType
+ .computeIfAbsent(realm, realmId -> new HashMap<>())
+ .computeIfAbsent(id.type(), objType -> new StatsHolder())
+ .add(outcome);
+ }
+
+ /**
+ * Retrieve maintenance-run reference stats per realm and {@linkplain
+ * org.apache.polaris.persistence.nosql.api.obj.ObjType object type}.
+ */
+ Map<String, ? extends Map<String, MaintenanceStats>>
toRealmObjTypeStatsMap() {
+ return perRealmAndObjType.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ r ->
+ r.getValue().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, ot ->
ot.getValue().toMaintenanceStats()))));
+ }
+ }
+
+ /** Maintenance stats holder. */
+ static final class StatsHolder {
+ long scanned;
+ long newer;
+ long retained;
+ long purged;
+
+ /** Consume the outcome for a reference or object-type. */
+ void add(ScanItemOutcome outcome) {
+ scanned++;
+ switch (outcome) {
+ case REALM_PURGE, PURGED -> purged++;
+ case TOO_NEW_RETAINED -> newer++;
+ case RETAINED, UNHANDLED_RETAINED -> retained++;
+ default -> throw new IllegalStateException("Unknown outcome " +
outcome);
+ }
+ }
+
+ /** Produce the serializable-stats container. */
+ MaintenanceStats toMaintenanceStats() {
+ return MaintenanceStats.builder()
+ .scanned(scanned)
+ .newer(newer)
+ .retained(retained)
+ .purged(purged)
+ .build();
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java
new file mode 100644
index 000000000..f49cf06cb
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static java.util.Map.entry;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.PrimitiveSink;
+import jakarta.annotation.Nonnull;
+import java.util.Map;
+import java.util.function.Predicate;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import
org.apache.polaris.persistence.nosql.maintenance.impl.ScanHandler.RetainCheck;
+import org.jspecify.annotations.NonNull;
+
+/**
+ * Collects reference names and objects to retain.
+ *
+ * <p>The implementation uses bloom-filters to limit the heap usage when a
huge number of
+ * references/objects is being used.
+ */
+@SuppressWarnings("UnstableApiUsage")
+final class AllRetained {
+
+ /**
+ * Some "salt" to make the bloom filters non-deterministic, in case there
are false-positives, to
+ * reduce the number of false positives over time.
+ */
+ private final int salt;
+
+ // @NonNull is the jspecify variant, which allows type-usage.
+ // Jakarta's @Nonnull does not allow type-usage.
+ private final BloomFilter<Map.@NonNull Entry<String, String>> refsFilter;
+ private final BloomFilter<Map.@NonNull Entry<String, Long>> objsFilter;
+ private long refAdds;
+ private long objAdds;
+
+ AllRetained(long expectedReferenceCount, long expectedObjCount, double fpp,
int salt) {
+ this.salt = salt;
+ this.refsFilter = BloomFilter.create(this::refFunnel,
expectedReferenceCount, fpp);
+ this.objsFilter = BloomFilter.create(this::objFunnel, expectedObjCount,
fpp);
+ }
+
+ private void refFunnel(Map.Entry<String, String> realmRef, @Nonnull
PrimitiveSink primitiveSink) {
+ primitiveSink.putInt(salt);
+ primitiveSink.putUnencodedChars(realmRef.getKey());
+ primitiveSink.putUnencodedChars(realmRef.getValue());
+ }
+
+ private void objFunnel(Map.Entry<String, Long> realmObj, @Nonnull
PrimitiveSink primitiveSink) {
+ primitiveSink.putInt(salt);
+ primitiveSink.putUnencodedChars(realmObj.getKey());
+ var id = realmObj.getValue();
+ primitiveSink.putLong(id);
+ }
+
+ void addRetainedRef(String realm, String ref) {
+ refsFilter.put(entry(realm, ref));
+ refAdds++;
+ }
+
+ void addRetainedObj(String realm, long id) {
+ objsFilter.put(entry(realm, id));
+ objAdds++;
+ }
+
+ /** The number of {@link #addRetainedRef(String, String)} invocations. */
+ long refAdds() {
+ return refAdds;
+ }
+
+ /** The number of {@link #addRetainedObj(String, long)} invocations. */
+ long objAdds() {
+ return objAdds;
+ }
+
+ boolean withinExpectedFpp(double expectedFpp) {
+ return refsFilter.expectedFpp() < expectedFpp && objsFilter.expectedFpp()
< expectedFpp;
+ }
+
+ RetainCheck<String> referenceRetainCheck() {
+ return (realm, ref) -> refsFilter.mightContain(entry(realm, ref));
+ }
+
+ RetainCheck<ObjRef> objRetainCheck(Predicate<String> objTypeIdPredicate) {
+ return (realm, id) ->
+ objTypeIdPredicate.test(id.type()) ||
objsFilter.mightContain(entry(realm, id.id()));
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java
new file mode 100644
index 000000000..6ce50e348
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.function.LongSupplier;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation;
+
+/**
+ * Holds information about one maintenance run.
+ *
+ * <p>This object is <em>eventually overwritten</em> with the final result of
the maintenance run.
+ */
+@PolarisImmutable
+@JsonSerialize(as = ImmutableMaintenanceRunObj.class)
+@JsonDeserialize(as = ImmutableMaintenanceRunObj.class)
+public interface MaintenanceRunObj extends Obj {
+
+ ObjType TYPE = new MaintenanceRunObjType();
+
+ @Override
+ default ObjType type() {
+ return TYPE;
+ }
+
+ MaintenanceRunInformation runInformation();
+
+ static ImmutableMaintenanceRunObj.Builder builder() {
+ return ImmutableMaintenanceRunObj.builder();
+ }
+
+ final class MaintenanceRunObjType extends AbstractObjType<MaintenanceRunObj>
{
+ public MaintenanceRunObjType() {
+ super("mtr", "Maintenance Run", MaintenanceRunObj.class);
+ }
+
+ @Override
+ public long cachedObjectExpiresAtMicros(Obj obj, LongSupplier clockMicros)
{
+ var mo = (MaintenanceRunObj) obj;
+ if (mo.runInformation().finished().isPresent()) {
+ return CACHE_UNLIMITED;
+ }
+ return NOT_CACHED;
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java
new file mode 100644
index 000000000..ca476351d
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableMaintenanceRunsObj.class)
+@JsonDeserialize(as = ImmutableMaintenanceRunsObj.class)
+public interface MaintenanceRunsObj extends BaseCommitObj {
+
+ String MAINTENANCE_RUNS_REF_NAME = "maintenance-runs";
+
+ ObjType TYPE = new MaintenanceRunsObjType();
+
+ static ImmutableMaintenanceRunsObj.Builder builder() {
+ return ImmutableMaintenanceRunsObj.builder();
+ }
+
+ /**
+ * The ID of the object holding the maintenance run information.
+ *
+ * <p>The {@linkplain MaintenanceRunObj#runInformation() maintenance run
information} is
+ * <em>not</em> included in this object, because {@link MaintenanceRunObj}
is initially written as
+ * "currently running" and then updated with the final state of the
maintenance run. Updating the
+ * {@link MaintenanceRunObj} is not great but okay, but updating a {@link
BaseCommitObj} is an
+ * absolute no-go.
+ */
+ ObjRef maintenanceRunId();
+
+ @Override
+ default ObjType type() {
+ return TYPE;
+ }
+
+ final class MaintenanceRunsObjType extends
AbstractObjType<MaintenanceRunsObj> {
+ public MaintenanceRunsObjType() {
+ super("mtrs", "Maintenance Runs", MaintenanceRunsObj.class);
+ }
+ }
+
+ interface Builder extends BaseCommitObj.Builder<MaintenanceRunsObj, Builder>
{}
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java
new file mode 100644
index 000000000..897156b18
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_ID;
+import static
org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_PREFIX;
+import static
org.apache.polaris.persistence.nosql.api.backend.PersistId.persistId;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static
org.apache.polaris.persistence.nosql.api.obj.ObjTypes.objTypeById;
+import static
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER;
+import static
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_DELETE_BATCH_SIZE;
+import static
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_INITIALIZED_FPP;
+import static
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_MAX_ACCEPTABLE_FPP;
+import static
org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj.MAINTENANCE_RUNS_REF_NAME;
+import static
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.ACTIVE;
+import static
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.INACTIVE;
+import static
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.PURGED;
+import static
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.PURGING;
+
+import com.google.common.collect.Streams;
+import com.google.common.math.LongMath;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.math.RoundingMode;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.RealmPersistenceFactory;
+import org.apache.polaris.persistence.nosql.api.SystemPersistence;
+import org.apache.polaris.persistence.nosql.api.backend.Backend;
+import org.apache.polaris.persistence.nosql.api.commit.Committer;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjTypes;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+import
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunSpec;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceService;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.realms.api.RealmDefinition;
+import
org.apache.polaris.persistence.nosql.realms.api.RealmExpectedStateMismatchException;
+import org.apache.polaris.persistence.nosql.realms.api.RealmManagement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieve an instance of this class by adding the {@code
+ * polaris-persistence-nosql-maintenance-impl} artifact to the runtime class
path and then use CDI
+ * via {@code @Inject MaintenanceService} to access it.
+ */
+@ApplicationScoped
+class MaintenanceServiceImpl implements MaintenanceService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MaintenanceServiceImpl.class);
+
+ static final int MIN_GRACE_TIME_MINUTES = 5;
+
+ private static final long MIN_GRACE_TIME_MICROS =
+ TimeUnit.MINUTES.toMicros(MIN_GRACE_TIME_MINUTES);
+
+ private final Backend backend;
+ private final Persistence systemPersistence;
+ private final Committer<MaintenanceRunsObj, MaintenanceRunObj> committer;
+ private final RealmPersistenceFactory realmPersistenceFactory;
+ private final RealmManagement realmManagement;
+ private final List<PerRealmRetainedIdentifier> perRealmRetainedIdentifiers;
+ private final Map<String, List<ObjTypeRetainedIdentifier>>
objTypeRetainedIdentifiers;
+ private final MaintenanceConfig maintenanceConfig;
+ private final MonotonicClock monotonicClock;
+
+ @SuppressWarnings("CdiInjectionPointsInspection")
+ @Inject
+ MaintenanceServiceImpl(
+ Backend backend,
+ @SystemPersistence Persistence systemPersistence,
+ RealmPersistenceFactory realmPersistenceFactory,
+ RealmManagement realmManagement,
+ Instance<PerRealmRetainedIdentifier> realmRetainedIdentifiers,
+ Instance<ObjTypeRetainedIdentifier> objTypeRetainedIdentifiers,
+ MaintenanceConfig maintenanceConfig,
+ MonotonicClock monotonicClock) {
+ checkArgument(
+ SYSTEM_REALM_ID.equals(systemPersistence.realmId()),
+ "Realms management must happen in the %s realm",
+ SYSTEM_REALM_ID);
+
+ this.backend = backend;
+ this.systemPersistence = systemPersistence;
+ this.realmManagement = realmManagement;
+ this.realmPersistenceFactory = realmPersistenceFactory;
+ this.maintenanceConfig = maintenanceConfig;
+ this.monotonicClock = monotonicClock;
+
+ this.perRealmRetainedIdentifiers =
realmRetainedIdentifiers.stream().toList();
+ this.objTypeRetainedIdentifiers =
+ objTypeRetainedIdentifiers.stream()
+ .collect(Collectors.groupingBy(oti -> oti.handledObjType().id()));
+ this.committer =
+ systemPersistence.createCommitter(
+ MAINTENANCE_RUNS_REF_NAME, MaintenanceRunsObj.class,
MaintenanceRunObj.class);
+
+ maintenanceServiceReport();
+ }
+
+ @PostConstruct
+ void init() {
+ // Do this in a @PostConstruct method as it involves I/O, which isn't a
good thing to do in a
+ // constructor, especially in CDI
+ systemPersistence.createReferenceSilent(MAINTENANCE_RUNS_REF_NAME);
+ }
+
+ @Override
+ @Nonnull
+ public List<MaintenanceRunInformation> maintenanceRunLog() {
+ var runIds =
+ Streams.stream(
+ systemPersistence
+ .commits()
+ .commitLog(
+ MAINTENANCE_RUNS_REF_NAME, OptionalLong.empty(),
MaintenanceRunsObj.class))
+ .filter(Objects::nonNull)
+
.limit(maintenanceConfig.retainedRuns().orElse(MaintenanceConfig.DEFAULT_RETAINED_RUNS))
+ .map(MaintenanceRunsObj::maintenanceRunId)
+ .toArray(ObjRef[]::new);
+ return Stream.of(systemPersistence.fetchMany(MaintenanceRunObj.class,
runIds))
+ .filter(Objects::nonNull)
+ .map(MaintenanceRunObj::runInformation)
+ .toList();
+ }
+
+ @Nonnull
+ @Override
+ public MaintenanceRunSpec buildMaintenanceRunSpec() {
+ try (var realms = realmManagement.list()) {
+ var specBuilder = MaintenanceRunSpec.builder();
+ realms.forEach(
+ realm -> {
+ switch (realm.status()) {
+ case CREATED, INITIALIZING, LOADING, PURGED -> {
+ // Don't handle these states, those are either final or known
to contain
+ // inconsistent data
+ }
+ case PURGING -> specBuilder.addRealmsToPurge(realm.id());
+ case ACTIVE, INACTIVE ->
specBuilder.addRealmsToProces(realm.id());
+ default ->
+ throw new IllegalStateException(
+ "Unexpected realm status " + realm.status() + " for
realm " + realm.id());
+ }
+ });
+ return specBuilder.build();
+ }
+ }
+
+ @Override
+ @Nonnull
+ public MaintenanceRunInformation performMaintenance(
+ @Nonnull MaintenanceRunSpec maintenanceRunSpec) {
+ LOGGER.info(
+ "Triggering maintenance run with {} realms to purge and {} realms to
process",
+ maintenanceRunSpec.realmsToPurge().size(),
+ maintenanceRunSpec.realmsToProcess().size());
+
+ checkArgument(
+ maintenanceRunSpec.realmsToPurge().stream()
+ .noneMatch(maintenanceRunSpec.realmsToProcess()::contains)
+ && maintenanceRunSpec.realmsToProcess().stream()
+ .noneMatch(maintenanceRunSpec.realmsToPurge()::contains),
+ "No realm ID must be included in both the set of realms to process and
the set of realms to purge");
+ checkArgument(
+ Stream.concat(
+ maintenanceRunSpec.realmsToPurge().stream(),
+ maintenanceRunSpec.realmsToProcess().stream())
+ .noneMatch(id -> id.startsWith(SYSTEM_REALM_PREFIX)),
+ "System realm IDs must not be present in the maintenance run
specification");
+
+ var config = maintenanceConfig;
+ checkConfig(config);
+
+ // TODO follow-up: some safeguard that checks the run-log for an
unfinished run, outside of this
+ // function!
+
+ var allRetained = constructAllRetained(config);
+
+ var runObj = initMaintenanceRunObj();
+ var runInfo =
MaintenanceRunInformation.builder().from(runObj.runInformation());
+
+ var maxCreatedAtMicros = calcMaxCreatedAtMicros(config);
+
+ var description = new StringWriter();
+ var descriptionWriter = new PrintWriter(description);
+
+ var info = (MaintenanceRunInformation) null;
+ try {
+ try {
+ var realmsToProcess = processRealms(maintenanceRunSpec, allRetained,
descriptionWriter);
+ var realmsToPurge = purgeRealms(maintenanceRunSpec, descriptionWriter);
+
+ if (maintenanceRunSpec.includeSystemRealm()) {
+ realmsToProcess.add(SYSTEM_REALM_ID);
+ identifyAgainstRealm(SYSTEM_REALM_ID, allRetained);
+ }
+
+ if (!maintenanceRunSpec.realmsToPurge().isEmpty() &&
backend.supportsRealmDeletion()) {
+ LOGGER.info(
+ "Purging realms {} directly against the backend database...",
+ String.join(", ", maintenanceRunSpec.realmsToPurge()));
+ backend.deleteRealms(maintenanceRunSpec.realmsToPurge());
+ runInfo.purgedRealms(maintenanceRunSpec.realmsToPurge().size());
+ } else {
+ runInfo.purgedRealms(0);
+ }
+
+ var seenRealmsToPurge = new HashSet<String>();
+ var expectFpp =
config.maxAcceptableFilterFpp().orElse(DEFAULT_MAX_ACCEPTABLE_FPP);
+
+ var refStats = new
AbstractScanItemStatsCollector.ScanRefStatsCollector();
+ var objStats = new
AbstractScanItemStatsCollector.ScanObjStatsCollector();
+
+ // Ensures that objects with unknown obj-types do not get purged.
+ // The assumption here is that if the obj-type is not known, there's
also no
+ // ObjTypeRetainedIdentifier/RealmRetainedIdentifier, which could
handle these object types.
+ // As a follow-up, it might be appropriate to add an advanced
configuration option to define
+ // the object types that shall be purged.
+ // Even if the obj-type is unknown, to clean up after an
extension/plugin that used those
+ // object-types is no longer being used.
+ var nonGenericObjTypeIds = ObjTypes.nonGenericObjTypes().keySet();
+ var objTypeIdPredicate =
+ (Predicate<String>) objTypeId ->
!nonGenericObjTypeIds.contains(objTypeId);
+
+ var canDelete = allRetained.withinExpectedFpp(expectFpp);
+ try (var refHandler =
+ new ScanHandler<>(
+ "reference",
+ config.referenceScanRateLimitPerSecond(),
+ maxCreatedAtMicros,
+ realmsToProcess,
+ realmsToPurge,
+ seenRealmsToPurge::add,
+ allRetained.referenceRetainCheck(),
+ config.deleteBatchSize().orElse(DEFAULT_DELETE_BATCH_SIZE),
+ realmRefs -> {
+ if (canDelete) {
+ backend.batchDeleteRefs(realmRefs);
+ }
+ },
+ refStats);
+ var objHandler =
+ new ScanHandler<>(
+ "object",
+ config.objectScanRateLimitPerSecond(),
+ maxCreatedAtMicros,
+ realmsToProcess,
+ realmsToPurge,
+ seenRealmsToPurge::add,
+ allRetained.objRetainCheck(objTypeIdPredicate),
+ config.deleteBatchSize().orElse(DEFAULT_DELETE_BATCH_SIZE),
+ realmObjs -> {
+ if (canDelete) {
+ backend.batchDeleteObjs(
+ realmObjs.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e ->
+ e.getValue().stream()
+ .map(oid ->
persistId(oid.id(), oid.numParts()))
+
.collect(Collectors.toSet()))));
+ }
+ },
+ objStats)) {
+
+ LOGGER.info("Start scanning backend database, too-new: {} ...",
maxCreatedAtMicros);
+ backend.scanBackend(
+
refHandler.asReferenceScanCallback(monotonicClock::currentTimeMillis),
+ objHandler.asObjScanCallback(monotonicClock::currentTimeMillis));
+
+ LOGGER.info("Finished scanning backend database");
+
+ runInfo
+ .referenceStats(refStats.stats.toMaintenanceStats())
+ .objStats(objStats.stats.toMaintenanceStats())
+ .perRealmPerObjTypeStats(objStats.toRealmObjTypeStatsMap())
+ .perRealmReferenceStats(refStats.toRealmObjTypeStatsMap())
+ .identifiedObjs(allRetained.objAdds())
+ .identifiedReferences(allRetained.refAdds());
+
+ if (!canDelete) {
+ var warn =
+ "Maintenance run finished but did not purge all unreferenced
objects, "
+ + "because the probabilistic filter was not properly
sized. "
+ + "The next maintenance run will be sized according to
current run.";
+ if (!seenRealmsToPurge.isEmpty()) {
+ warn +=
+ format("\nRealms %s NOT marked as purged.", String.join(",
", seenRealmsToPurge));
+ }
+ descriptionWriter.println(warn);
+ LOGGER.warn(warn);
+ }
+ }
+
+ if (canDelete) {
+ updateRealmsAsPurged(maintenanceRunSpec, seenRealmsToPurge);
+ }
+
+ runInfo.success(true);
+
+ LOGGER.info("Maintenance run completed successfully");
+ } catch (Exception e) {
+ LOGGER.info("Maintenance run failed", e);
+
+ runInfo.success(false).statusMessage("Maintenance run did not finish
successfully.");
+
+ // Add stack trace as detailed information
+ descriptionWriter.printf("FAILURE:%n");
+ e.printStackTrace(descriptionWriter);
+ } finally {
+ descriptionWriter.flush();
+ runInfo
+ .finished(monotonicClock.currentInstant())
+ .detailedInformation(description.toString());
+ info = runInfo.build();
+ }
+ } finally {
+ LOGGER.info("Persisting maintenance result {}", info);
+ systemPersistence.write(
+
MaintenanceRunObj.builder().from(runObj).runInformation(info).build(),
+ MaintenanceRunObj.class);
+ }
+
+ return info;
+ }
+
+ private void updateRealmsAsPurged(
+ MaintenanceRunSpec maintenanceRunSpec, HashSet<String>
seenRealmsToPurge) {
+ // Update the realm status of the realms that were specified to be purged
as `PURGED` if no
+ // data for those realms has been seen.
+ maintenanceRunSpec.realmsToPurge().stream()
+ .filter(r -> !seenRealmsToPurge.contains(r))
+ .map(realmManagement::get)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .forEach(
+ purgedRealm -> {
+ try {
+ realmManagement.update(
+ purgedRealm,
+
RealmDefinition.builder().from(purgedRealm).status(PURGED).build());
+ } catch (RealmExpectedStateMismatchException e) {
+ // ignore, do the state transition during the next maintenance
run
+ }
+ });
+ }
+
+ private void maintenanceServiceReport() {
+ LOGGER.info("Using {} realm retained identifiers",
perRealmRetainedIdentifiers.size());
+ perRealmRetainedIdentifiers.forEach(
+ realmRetainedIdentifier ->
+ LOGGER.info("Realm retained identifier: {}",
realmRetainedIdentifier.name()));
+ LOGGER.info(
+ "Using {} object type identifiers:",
+
objTypeRetainedIdentifiers.values().stream().mapToInt(List::size).sum());
+ objTypeRetainedIdentifiers.forEach(
+ (type, idents) -> {
+ LOGGER.info(
+ "Using {} identifiers for object type '{}' {}",
+ idents.size(),
+ type,
+ objTypeById(type).name());
+ idents.forEach(
+ objTypeRetainedIdentifier ->
+ LOGGER.info(
+ "Object type '{}' identifier: {}", type,
objTypeRetainedIdentifier.name()));
+ });
+ }
+
+ private Set<String> processRealms(
+ MaintenanceRunSpec maintenanceRunSpec,
+ AllRetained allRetained,
+ PrintWriter descriptionWriter) {
+ var realmsToProcess = new HashSet<String>();
+ for (var realmId : maintenanceRunSpec.realmsToProcess()) {
+ var currentRealmStatus =
+
realmManagement.get(realmId).map(RealmDefinition::status).orElse(ACTIVE);
+ if ((currentRealmStatus == ACTIVE || currentRealmStatus == INACTIVE)
+ && identifyAgainstRealm(realmId, allRetained)) {
+ realmsToProcess.add(realmId);
+ }
+ }
+ maintenanceRunSpec.realmsToProcess().stream()
+ .filter(r -> !realmsToProcess.contains(r))
+ .forEach(
+ r -> {
+ var msg =
+ format(
+ "No realm retained identifier was able to handle the
realm '%s' or the realm is not in status ACTIVE or INACTIVE, no references or
objects will be purged from this realm.",
+ r);
+ descriptionWriter.println(msg);
+ LOGGER.warn(msg);
+ });
+ return realmsToProcess;
+ }
+
+ private Set<String> purgeRealms(
+ MaintenanceRunSpec maintenanceRunSpec, PrintWriter descriptionWriter) {
+ var realmsToPurge = new HashSet<String>();
+ for (var realmId : maintenanceRunSpec.realmsToPurge()) {
+ var currentRealmStatus =
+
realmManagement.get(realmId).map(RealmDefinition::status).orElse(PURGED);
+ if (currentRealmStatus == PURGING || currentRealmStatus == PURGED) {
+ realmsToPurge.add(realmId);
+ }
+ }
+ maintenanceRunSpec.realmsToPurge().stream()
+ .filter(r -> !realmsToPurge.contains(r))
+ .forEach(
+ r -> {
+ var msg =
+ format(
+ "The realm '%s' is not in state PURGING, will therefore
not be purged.", r);
+ descriptionWriter.println(msg);
+ LOGGER.warn(msg);
+ });
+ return realmsToPurge;
+ }
+
+ private long calcMaxCreatedAtMicros(MaintenanceConfig effectiveConfig) {
+ var now = monotonicClock.currentTimeMicros();
+ var grace =
+ effectiveConfig
+ .createdAtGraceTime()
+ .map(
+ d -> {
+ var micros = SECONDS.toMicros(d.toSeconds());
+ micros += TimeUnit.NANOSECONDS.toMicros(d.toNanosPart());
+ return Math.max(micros, MIN_GRACE_TIME_MICROS);
+ })
+ .orElse(MIN_GRACE_TIME_MICROS);
+ return now - grace;
+ }
+
+ private AllRetained constructAllRetained(MaintenanceConfig effectiveConfig) {
+ var expectedReferenceCount =
+ effectiveConfig
+ .expectedReferenceCount()
+ .orElse(MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT);
+ var expectedObjCount =
+
effectiveConfig.expectedObjCount().orElse(MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT);
+
+ for (var lastRunIter =
+ Streams.stream(
+ systemPersistence
+ .commits()
+ .commitLog(
+ MAINTENANCE_RUNS_REF_NAME,
+ OptionalLong.empty(),
+ MaintenanceRunsObj.class))
+ .filter(Objects::nonNull)
+ .map(r -> systemPersistence.fetch(r.maintenanceRunId(),
MaintenanceRunObj.class))
+ .filter(Objects::nonNull)
+ .map(MaintenanceRunObj::runInformation)
+ .iterator();
+ lastRunIter.hasNext(); ) {
+ var ri = lastRunIter.next();
+ var refs =
+ Math.max(
+ ri.referenceStats().map(st ->
st.scanned().orElse(0L)).orElse(0L),
+ ri.identifiedReferences().orElse(0L));
+ var objs =
+ Math.max(
+ ri.objStats().map(st -> st.scanned().orElse(0L)).orElse(0L),
+ ri.identifiedObjs().orElse(0L));
+ if (refs == 0L || objs == 0L) {
+ continue;
+ }
+ if (refs > expectedReferenceCount) {
+ // Add 10% to account for newly created references
+ expectedReferenceCount =
+ (long)
+ (refs
+ * effectiveConfig
+ .countFromLastRunMultiplier()
+ .orElse(DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER));
+ }
+ if (objs > expectedObjCount) {
+ // Add 10% to account for newly created objects
+ expectedObjCount =
+ (long)
+ (objs
+ * effectiveConfig
+ .countFromLastRunMultiplier()
+ .orElse(DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER));
+ }
+ }
+
+ expectedReferenceCount = Math.max(expectedReferenceCount, 1_000);
+ expectedObjCount = Math.max(expectedObjCount, 100_000);
+ var configFpp =
effectiveConfig.filterInitializedFpp().orElse(DEFAULT_INITIALIZED_FPP);
+ LOGGER.info(
+ "Sized retained collector for {} references and {} objects with an fpp
of {}, approximate bloom filter heap sizes: {} and {} bytes",
+ expectedReferenceCount,
+ expectedObjCount,
+ configFpp,
+ bloomFilterBytes(expectedReferenceCount, configFpp),
+ bloomFilterBytes(expectedObjCount, configFpp));
+
+ return new AllRetained(
+ expectedReferenceCount, expectedObjCount, configFpp,
ThreadLocalRandom.current().nextInt());
+ }
+
+ private static final double LOG2_SQUARED = Math.log(2) * Math.log(2);
+
+ static long bloomFilterBytes(long elements, double fpp) {
+ var bits = (long) (-elements * Math.log(fpp) / LOG2_SQUARED);
+ return LongMath.divide(bits, 64, RoundingMode.CEILING);
+ }
+
+ private boolean identifyAgainstRealm(String realmId, AllRetained
allRetained) {
+ LOGGER.info("Identifying referenced data in realm '{}'", realmId);
+
+ var pers = realmPersistenceFactory.newBuilder().realmId(realmId).build();
+ var collector = new RetainedCollectorImpl(pers, allRetained,
objTypeRetainedIdentifiers);
+
+ boolean any = false;
+ for (var realmRetainedIdentifier : perRealmRetainedIdentifiers) {
+ LOGGER.info(
+ "Running maintenance for realm '{}' via '{}'", realmId,
realmRetainedIdentifier.name());
+ var handled = realmRetainedIdentifier.identifyRetained(collector);
+ LOGGER.info(
+ "Realm identifier '{}' {} {}",
+ realmRetainedIdentifier.name(),
+ handled ? "handled" : "did not handle",
+ realmId);
+ any |= handled;
+ }
+ return any;
+ }
+
+ private MaintenanceRunObj initMaintenanceRunObj() {
+ return committer
+ .commitRuntimeException(
+ (state, refObjSupplier) -> {
+ var refObj = refObjSupplier.get();
+ var res = MaintenanceRunsObj.builder();
+
+ var ro =
+ MaintenanceRunObj.builder()
+ .id(systemPersistence.generateId())
+ .runInformation(
+ MaintenanceRunInformation.builder()
+ .started(monotonicClock.currentInstant())
+ .build())
+ .build();
+ res.maintenanceRunId(objRef(ro));
+
+ state.writeIfNew("ro", ro);
+
+ return state.commitResult(ro, res, refObj);
+ })
+ .orElseThrow();
+ }
+
+ private static void checkConfig(MaintenanceConfig config) {
+ config
+ .retainedRuns()
+ .ifPresent(v -> checkArgument(v > 1, "Number of maintenance runs must
be at least 2"));
+ config
+ .expectedReferenceCount()
+ .ifPresent(
+ v ->
+ checkArgument(
+ v >= MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT,
+ "Expected reference count runs must be greater than or
equal to %s",
+ MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT));
+ config
+ .expectedObjCount()
+ .ifPresent(
+ v ->
+ checkArgument(
+ v >= MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT,
+ "Expected object count runs must be greater than or equal
to %s",
+ MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT));
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java
new file mode 100644
index 000000000..568e0ed86
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_RETAINED_RUNS;
+import static
org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj.MAINTENANCE_RUNS_REF_NAME;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+import org.apache.polaris.persistence.nosql.maintenance.spi.CountDownPredicate;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+/** Retained-identifier for the maintenance service's own reference and
objects. */
+@SuppressWarnings("CdiInjectionPointsInspection")
+@ApplicationScoped
+class MaintenanceServiceRealmRetainedIdentifier implements
PerRealmRetainedIdentifier {
+ @Inject MaintenanceConfig maintenanceConfig;
+
+ @Override
+ public String name() {
+ return "Maintenance service";
+ }
+
+ @Override
+ public boolean identifyRetained(@Nonnull RetainedCollector collector) {
+ if (!collector.isSystemRealm()) {
+ return false;
+ }
+
+ collector.refRetain(
+ MAINTENANCE_RUNS_REF_NAME,
+ MaintenanceRunsObj.class,
+ new
CountDownPredicate<>(maintenanceConfig.retainedRuns().orElse(DEFAULT_RETAINED_RUNS)),
+ maintenance -> collector.retainObject(maintenance.maintenanceRunId()));
+
+ return true;
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java
new file mode 100644
index 000000000..bd525f6b7
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+interface RateLimit {
+ void acquire();
+
+ @SuppressWarnings("UnstableApiUsage")
+ static RateLimit create(int ratePerSecond) {
+ if (ratePerSecond <= 0 || ratePerSecond == Integer.MAX_VALUE) {
+ return new RateLimit() {
+ @Override
+ public void acquire() {}
+
+ @Override
+ public String toString() {
+ return "unlimited";
+ }
+ };
+ }
+ return new RateLimit() {
+ final RateLimiter limiter = RateLimiter.create(ratePerSecond);
+
+ @Override
+ public void acquire() {
+ limiter.acquire();
+ }
+
+ @Override
+ public String toString() {
+ return "up to " + ratePerSecond;
+ }
+ };
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java
new file mode 100644
index 000000000..6590de77f
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Collections.emptyIterator;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+
+import com.google.common.collect.AbstractIterator;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.PersistenceParams;
+import org.apache.polaris.persistence.nosql.api.commit.Commits;
+import org.apache.polaris.persistence.nosql.api.commit.Committer;
+import
org.apache.polaris.persistence.nosql.api.exceptions.ReferenceAlreadyExistsException;
+import
org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException;
+import org.apache.polaris.persistence.nosql.api.index.Index;
+import org.apache.polaris.persistence.nosql.api.index.IndexContainer;
+import org.apache.polaris.persistence.nosql.api.index.IndexValueSerializer;
+import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+/** {@link RetainedCollector} implementation, per realm. */
+final class RetainedCollectorImpl implements Persistence, RetainedCollector {
+ private final Persistence persistence;
+ private final AllRetained allRetained;
+ private final String realmId;
+ private final Map<String, List<ObjTypeRetainedIdentifier>>
objTypeRetainedIdentifiers;
+
+ private final Set<Long> currentNesting = new HashSet<>();
+
+ RetainedCollectorImpl(
+ Persistence persistence,
+ AllRetained allRetained,
+ Map<String, List<ObjTypeRetainedIdentifier>> objTypeRetainedIdentifiers)
{
+ this.persistence = persistence;
+ this.allRetained = allRetained;
+ this.realmId = persistence.realmId();
+ this.objTypeRetainedIdentifiers = objTypeRetainedIdentifiers;
+ }
+
+ @Nonnull
+ @Override
+ public String realm() {
+ return realmId;
+ }
+
+ @Nonnull
+ @Override
+ public Persistence realmPersistence() {
+ return this;
+ }
+
+ @Override
+ public void retainObject(@Nonnull ObjRef objRef) {
+ if (!currentNesting.add(objRef.id())) {
+ return;
+ }
+ try {
+ allRetained.addRetainedObj(realmId, objRef.id());
+
+ var otIdents = objTypeRetainedIdentifiers.get(objRef.type());
+ if (otIdents != null) {
+ for (var otIdent : otIdents) {
+ otIdent.identifyRelatedObj(this, objRef);
+ }
+ }
+ } finally {
+ currentNesting.remove(objRef.id());
+ }
+ }
+
+ @Override
+ public void retainReference(@Nonnull String name) {
+ allRetained.addRetainedRef(realmId, name);
+ }
+
+ // Persistence delegate
+
+ @Nonnull
+ @Override
+ public Reference createReference(@Nonnull String name, @Nonnull
Optional<ObjRef> pointer)
+ throws ReferenceAlreadyExistsException {
+ retainReference(name);
+ pointer.ifPresent(this::retainObject);
+ return persistence.createReference(name, pointer);
+ }
+
+ @Override
+ public void createReferenceSilent(@Nonnull String name) {
+ retainReference(name);
+ persistence.createReferenceSilent(name);
+ }
+
+ @Override
+ public void createReferencesSilent(Set<String> referenceNames) {
+ referenceNames.forEach(this::retainReference);
+ persistence.createReferencesSilent(referenceNames);
+ }
+
+ @Nonnull
+ @Override
+ public Reference fetchOrCreateReference(
+ @Nonnull String name, @Nonnull Supplier<Optional<ObjRef>>
pointerForCreate) {
+ try {
+ return fetchReference(name);
+ } catch (ReferenceNotFoundException e) {
+ try {
+ var objRef = pointerForCreate.get();
+ objRef.ifPresent(this::retainObject);
+ return createReference(name, objRef);
+ } catch (ReferenceAlreadyExistsException x) {
+ // Unlikely that we ever get here (ref does not exist (but then
concurrently created)
+ return fetchReference(name);
+ }
+ }
+ }
+
+ @Nonnull
+ @Override
+ public Optional<Reference> updateReferencePointer(
+ @Nonnull Reference reference, @Nonnull ObjRef newPointer) throws
ReferenceNotFoundException {
+ retainReference(reference.name());
+ retainObject(newPointer);
+ return persistence.updateReferencePointer(reference, newPointer);
+ }
+
+ @Nonnull
+ @Override
+ public Reference fetchReference(@Nonnull String name) throws
ReferenceNotFoundException {
+ retainReference(name);
+ var ref = persistence.fetchReference(name);
+ ref.pointer().ifPresent(this::retainObject);
+ return ref;
+ }
+
+ @Nonnull
+ @Override
+ public Reference fetchReferenceForUpdate(@Nonnull String name) throws
ReferenceNotFoundException {
+ retainReference(name);
+ var ref = persistence.fetchReferenceForUpdate(name);
+ ref.pointer().ifPresent(this::retainObject);
+ return ref;
+ }
+
+ @Override
+ public <T extends Obj> Optional<T> fetchReferenceHead(
+ @Nonnull String name, @Nonnull Class<T> clazz) throws
ReferenceNotFoundException {
+ retainReference(name);
+ var ref = persistence.fetchReferenceHead(name, clazz);
+ ref.ifPresent(head -> retainObject(objRef(head)));
+ return ref;
+ }
+
+ @Nullable
+ @Override
+ public <T extends Obj> T fetch(@Nonnull ObjRef id, @Nonnull Class<T> clazz) {
+ retainObject(id);
+ return persistence.fetch(id, clazz);
+ }
+
+ @Nonnull
+ @Override
+ public <T extends Obj> T[] fetchMany(@Nonnull Class<T> clazz, @Nonnull
ObjRef... ids) {
+ for (var id : ids) {
+ if (id != null) {
+ retainObject(id);
+ }
+ }
+ return persistence.fetchMany(clazz, ids);
+ }
+
+ @Nonnull
+ @Override
+ public <T extends Obj> T write(@Nonnull T obj, @Nonnull Class<T> clazz) {
+ retainObject(objRef(obj));
+ return persistence.write(obj, clazz);
+ }
+
+ @SafeVarargs
+ @Nonnull
+ @Override
+ public final <T extends Obj> T[] writeMany(@Nonnull Class<T> clazz, @Nonnull
T... objs) {
+ for (var obj : objs) {
+ if (obj != null) {
+ retainObject(objRef(obj));
+ }
+ }
+ return persistence.writeMany(clazz, objs);
+ }
+
+ @Override
+ public void delete(@Nonnull ObjRef id) {
+ persistence.delete(id);
+ }
+
+ @Override
+ public void deleteMany(@Nonnull ObjRef... ids) {
+ persistence.deleteMany(ids);
+ }
+
+ @Nullable
+ @Override
+ public <T extends Obj> T conditionalInsert(@Nonnull T obj, @Nonnull Class<T>
clazz) {
+ retainObject(objRef(obj));
+ return persistence.conditionalInsert(obj, clazz);
+ }
+
+ @Nullable
+ @Override
+ public <T extends Obj> T conditionalUpdate(
+ @Nonnull T expected, @Nonnull T update, @Nonnull Class<T> clazz) {
+ retainObject(objRef(update));
+ return persistence.conditionalUpdate(expected, update, clazz);
+ }
+
+ @Override
+ public <T extends Obj> boolean conditionalDelete(@Nonnull T expected,
Class<T> clazz) {
+ retainObject(objRef(expected));
+ return persistence.conditionalDelete(expected, clazz);
+ }
+
+ @Override
+ public PersistenceParams params() {
+ return persistence.params();
+ }
+
+ @Override
+ public int maxSerializedValueSize() {
+ return persistence.maxSerializedValueSize();
+ }
+
+ @Override
+ public long generateId() {
+ return persistence.generateId();
+ }
+
+ @Override
+ public ObjRef generateObjId(ObjType type) {
+ return persistence.generateObjId(type);
+ }
+
+ @Nullable
+ @Override
+ public <T extends Obj> T getImmediate(@Nonnull ObjRef id, @Nonnull Class<T>
clazz) {
+ retainObject(id);
+ return persistence.getImmediate(id, clazz);
+ }
+
+ @Override
+ public String realmId() {
+ return persistence.realmId();
+ }
+
+ @Override
+ public MonotonicClock monotonicClock() {
+ return persistence.monotonicClock();
+ }
+
+ @Override
+ public IdGenerator idGenerator() {
+ return persistence.idGenerator();
+ }
+
+ @Override
+ public <V> UpdatableIndex<V> buildWriteIndex(
+ @Nullable IndexContainer<V> indexContainer,
+ @Nonnull IndexValueSerializer<V> indexValueSerializer) {
+ return persistence.buildWriteIndex(indexContainer, indexValueSerializer);
+ }
+
+ @Override
+ public <V> Index<V> buildReadIndex(
+ @Nullable IndexContainer<V> indexContainer,
+ @Nonnull IndexValueSerializer<V> indexValueSerializer) {
+ return persistence.buildReadIndex(indexContainer, indexValueSerializer);
+ }
+
+ @Override
+ public <REF_OBJ extends BaseCommitObj, RESULT> Committer<REF_OBJ, RESULT>
createCommitter(
+ @Nonnull String refName,
+ @Nonnull Class<REF_OBJ> referencedObjType,
+ @Nonnull Class<RESULT> resultType) {
+ throw new UnsupportedOperationException(
+ "Committing operations not supported during retained-objects
identification");
+ }
+
+ @Override
+ public Commits commits() {
+ return new Commits() {
+ @Override
+ public <C extends BaseCommitObj> Iterator<C> commitLog(
+ String refName, OptionalLong offset, Class<C> clazz) {
+ checkArgument(
+ offset.isEmpty(), "Commit offset must be empty during
retained-objects identification");
+
+ var ref = fetchReference(refName);
+
+ return ref.pointer()
+ .map(
+ head ->
+ (Iterator<C>)
+ new AbstractIterator<C>() {
+ private ObjRef next = head;
+
+ @Override
+ protected C computeNext() {
+ if (next == null) {
+ return endOfData();
+ }
+ var r = fetch(next, clazz);
+ if (r == null) {
+ return endOfData();
+ }
+ next = r.directParent().orElse(null);
+ return r;
+ }
+ })
+ .orElse(emptyIterator());
+ }
+
+ @Override
+ public <C extends BaseCommitObj> Iterator<C> commitLogReversed(
+ String refName, long offset, Class<C> clazz) {
+ throw new UnsupportedOperationException(
+ "Reversed commit scanning not supported during retained-objects
identification");
+ }
+ };
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java
new file mode 100644
index 000000000..502ee3492
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static
org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_PREFIX;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+
+import jakarta.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import org.apache.polaris.persistence.nosql.api.backend.Backend;
+import org.apache.polaris.persistence.nosql.api.backend.PersistId;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ScanHandler<I> implements AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ScanHandler.class);
+
+ final String name;
+ final RateLimit rateLimit;
+ final long maxCreatedAtMicros;
+ final Set<String> realmsToRetain;
+ final Set<String> realmsToPurge;
+ final Consumer<String> seenRealmsToPurge;
+ final RetainCheck<I> retainCheck;
+ final int deleteBatchSize;
+ final Consumer<Map<String, Set<I>>> batchDelete;
+ final ScanItemCallback<I> itemCallback;
+
+ final Map<String, Set<I>> deletions = new HashMap<>();
+ int numDeletes;
+
+ ScanHandler(
+ String name,
+ OptionalInt rateLimit,
+ long maxCreatedAtMicros,
+ Set<String> realmsToRetain,
+ Set<String> realmsToPurge,
+ Consumer<String> seenRealmsToPurge,
+ RetainCheck<I> retainCheck,
+ int deleteBatchSize,
+ Consumer<Map<String, Set<I>>> batchDelete,
+ ScanItemCallback<I> itemCallback) {
+ this.name = name;
+ this.rateLimit = RateLimit.create(rateLimit.orElse(-1));
+ this.maxCreatedAtMicros = maxCreatedAtMicros;
+ this.realmsToRetain = realmsToRetain;
+ this.realmsToPurge = realmsToPurge;
+ this.seenRealmsToPurge = seenRealmsToPurge;
+ this.retainCheck = retainCheck;
+ this.deleteBatchSize = deleteBatchSize;
+ this.batchDelete = batchDelete;
+ this.itemCallback = itemCallback;
+ }
+
+ void scanned(String realmId, I id, long createdAtMicros) {
+ if (realmId.startsWith(SYSTEM_REALM_PREFIX) &&
!realmsToRetain.contains(realmId)) {
+ // some system realm, ignore
+ return;
+ }
+
+ rateLimit.acquire();
+ ScanItemOutcome outcome;
+ if (realmsToPurge.contains(realmId)) {
+ outcome = ScanItemOutcome.REALM_PURGE;
+ purge(realmId, id);
+ seenRealmsToPurge.accept(realmId);
+ } else if (createdAtMicros > maxCreatedAtMicros) {
+ outcome = ScanItemOutcome.TOO_NEW_RETAINED;
+ } else if (realmsToRetain.contains(realmId)) {
+ if (retainCheck.check(realmId, id)) {
+ outcome = ScanItemOutcome.RETAINED;
+ } else {
+ outcome = ScanItemOutcome.PURGED;
+ purge(realmId, id);
+ }
+ } else {
+ outcome = ScanItemOutcome.UNHANDLED_RETAINED;
+ }
+ itemCallback.itemOutcome(realmId, id, outcome);
+ LOGGER.debug(
+ "Got '{}' {} {} -> {}, createdAtMicros = {}",
+ realmId,
+ name,
+ id,
+ outcome.message,
+ createdAtMicros);
+ }
+
+ private void purge(String realmId, I id) {
+ LOGGER.debug("Enqueuing delete for '{}' {}", realmId, id);
+ deletions.computeIfAbsent(realmId, k -> new HashSet<>()).add(id);
+ numDeletes++;
+ if (numDeletes == deleteBatchSize) {
+ flushDeletes();
+ }
+ }
+
+ private void flushDeletes() {
+ LOGGER.debug("Flushing {} {} deletions", numDeletes, name);
+ batchDelete.accept(deletions);
+ deletions.clear();
+ numDeletes = 0;
+ }
+
+ @Override
+ public void close() {
+ if (numDeletes > 0) {
+ flushDeletes();
+ }
+ }
+
+ public Backend.ObjScanCallback asObjScanCallback(LongSupplier clock) {
+ return new ProgressObjScanCallback(clock);
+ }
+
+ public Backend.ReferenceScanCallback asReferenceScanCallback(LongSupplier
clock) {
+ return new ProgressReferenceScanCallback(clock);
+ }
+
+ @FunctionalInterface
+ interface RetainCheck<I> {
+ boolean check(String realm, I id);
+ }
+
+ private abstract static class ProgressCallback {
+ private final LongSupplier clock;
+ private long nextLog;
+ private long scanned;
+
+ ProgressCallback(LongSupplier clock) {
+ this.clock = clock;
+ nextLog = clock.getAsLong() + 2_000L;
+ }
+
+ protected void called(String what) {
+ var s = scanned++;
+ var now = clock.getAsLong();
+ if (now >= nextLog) {
+ LOGGER.info("... scanned {} {} so far", s, what);
+ nextLog = now + 2_000L;
+ }
+ }
+ }
+
+ private class ProgressReferenceScanCallback extends ProgressCallback
+ implements Backend.ReferenceScanCallback {
+
+ ProgressReferenceScanCallback(LongSupplier clock) {
+ super(clock);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void call(@Nonnull String realmId, @Nonnull String refName, long
createdAtMicros) {
+ called("references");
+ ((ScanHandler<String>) ScanHandler.this).scanned(realmId, refName,
createdAtMicros);
+ }
+ }
+
+ private class ProgressObjScanCallback extends ProgressCallback
+ implements Backend.ObjScanCallback {
+ ProgressObjScanCallback(LongSupplier clock) {
+ super(clock);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void call(
+ @Nonnull String realmId,
+ @Nonnull String type,
+ @Nonnull PersistId id,
+ long createdAtMicros) {
+ called("objects");
+ ((ScanHandler<ObjRef>) ScanHandler.this)
+ .scanned(realmId, objRef(type, id.id(), id.part()), createdAtMicros);
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java
new file mode 100644
index 000000000..8d197cbb5
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+
+@FunctionalInterface
+interface ScanItemCallback<I> {
+ void itemOutcome(@Nonnull String realm, @Nonnull I id, @Nonnull
ScanItemOutcome outcome);
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java
new file mode 100644
index 000000000..f1a1565a2
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+enum ScanItemOutcome {
+ REALM_PURGE("realm purge"),
+ TOO_NEW_RETAINED("too new"),
+ RETAINED("retained"),
+ PURGED("purged"),
+ UNHANDLED_RETAINED("unhandled/retained"),
+ ;
+
+ final String message;
+
+ ScanItemOutcome(String message) {
+ this.message = message;
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
new file mode 100644
index 000000000..9247f45ba
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Maintenance service implementation: do not directly use the types in this
package.
+ *
+ * <p>Uses bloom filters to "collect" the references and objects to retain.
The sizing of both
+ * filters uses the values of scanned references/objects of the last
<em>successful</em> maintenance
+ * run, plus 10%. If no successful maintenance service run is present, the
values of the maintenance
+ * configuration will be used.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd">
+ <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
new file mode 100644
index 000000000..e1050243b
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj$MaintenanceRunsObjType
+org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunObj$MaintenanceRunObjType
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java
b/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java
new file mode 100644
index 000000000..131065817
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static
org.apache.polaris.persistence.nosql.maintenance.impl.MutableMaintenanceConfig.GRACE_TIME;
+
+import jakarta.inject.Inject;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.RealmPersistenceFactory;
+import
org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+import
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation.MaintenanceStats;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunSpec;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceService;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.jboss.weld.junit5.EnableWeld;
+import org.jboss.weld.junit5.WeldInitiator;
+import org.jboss.weld.junit5.WeldSetup;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@SuppressWarnings("CdiInjectionPointsInspection")
+@ExtendWith(SoftAssertionsExtension.class)
+@EnableWeld
+public class TestMaintenance {
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ @WeldSetup WeldInitiator weld = WeldInitiator.performDefaultDiscovery();
+
+ String realmOne;
+ String realmTwo;
+ Persistence persOne;
+ Persistence persTwo;
+
+ @Inject MaintenanceService maintenance;
+ @Inject RealmPersistenceFactory realmPersistenceFactory;
+ @Inject MutableMonotonicClock mutableMonotonicClock;
+
+ @BeforeEach
+ protected void setup() {
+ RealmIdentOne.testCallback = c -> true;
+ RealmIdentTwo.testCallback = c -> true;
+ ObjTypeIdentOne.testCallback = (c, id) -> {};
+ ObjTypeIdentTwo.testCallback = (c, id) -> {};
+
+ // Set the "grace time" to 0 so tests can write refs+objs and get those
purged
+ MutableMaintenanceConfig.setCurrent(
+ MaintenanceConfig.builder().createdAtGraceTime(GRACE_TIME).build());
+
+ realmOne = UUID.randomUUID().toString();
+ realmTwo = UUID.randomUUID().toString();
+
+ // 'skipDecorators' is used to bypass the cache, which cannot be
consistent after maintenance
+ // purged some references/objects
+ persOne =
realmPersistenceFactory.newBuilder().realmId(realmOne).skipDecorators().build();
+ persTwo =
realmPersistenceFactory.newBuilder().realmId(realmTwo).skipDecorators().build();
+ }
+
+ @AfterEach
+ protected void cleanup() {
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Use maintenance to clean the backend for the next test
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder()
+ .includeSystemRealm(false)
+ .realmsToPurge(Set.of(realmOne, realmTwo))
+ .build());
+ }
+
+ @Test
+ public void noRealmsSpecified() {
+
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(),
ObjOne.class);
+
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(),
ObjTwo.class);
+
+ persOne.createReference("ref1", Optional.empty());
+ persTwo.createReference("ref1", Optional.empty());
+
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Run maintenance, no realm given to retain or purge, must not purge
anything
+ var runInfo =
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder().includeSystemRealm(false).build());
+
+ soft.assertThat(runInfo.referenceStats())
+
.contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build());
+ soft.assertThat(runInfo.objStats())
+
.contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build());
+ soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+ soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L);
+ }
+
+ @Test
+ public void simple() {
+ var rOneObj1 =
+
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(),
ObjOne.class);
+ var rTwoObj2 =
+
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(),
ObjTwo.class);
+
+ var systemRealmCalled = new AtomicInteger();
+ RealmIdentOne.testCallback =
+ collector -> {
+ if (collector.isSystemRealm()) {
+ systemRealmCalled.incrementAndGet();
+ }
+ return true;
+ };
+
+ persOne.createReference("ref1", Optional.empty());
+ persOne.createReference("ref2", Optional.empty());
+ persTwo.createReference("ref1", Optional.empty());
+ persTwo.createReference("ref2", Optional.empty());
+
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Run maintenance, provide realms, must purge unidentified
+ var runInfo =
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder()
+ .includeSystemRealm(false)
+ .realmsToProcess(Set.of(realmOne, realmTwo))
+ .build());
+
+ soft.assertThat(runInfo.referenceStats())
+
.contains(MaintenanceStats.builder().scanned(4L).purged(4L).retained(0L).newer(0L).build());
+ soft.assertThat(runInfo.objStats())
+
.contains(MaintenanceStats.builder().scanned(2L).purged(2L).retained(0L).newer(0L).build());
+ soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+ soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L);
+
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persOne.fetchReference("ref1"));
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persOne.fetchReference("ref2"));
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persTwo.fetchReference("ref1"));
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persTwo.fetchReference("ref2"));
+
+ soft.assertThat(persOne.fetch(objRef(rOneObj1), ObjOne.class)).isNull();
+ soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+ }
+
+ @Test
+ public void systemRealm() {
+ var systemRealmCalled = new AtomicInteger();
+ RealmIdentOne.testCallback =
+ collector -> {
+ if (collector.isSystemRealm()) {
+ systemRealmCalled.incrementAndGet();
+ }
+ return true;
+ };
+
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Run maintenance, provide realms, must purge unidentified
+ var runInfo =
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder()
+ .includeSystemRealm(true) // default
+ .build());
+
+ soft.assertThat(systemRealmCalled).hasValue(1);
+
+ soft.assertThat(runInfo.referenceStats())
+
.contains(MaintenanceStats.builder().scanned(1L).purged(0L).retained(0L).newer(1L).build());
+ soft.assertThat(runInfo.objStats())
+
.contains(MaintenanceStats.builder().scanned(4L).purged(0L).retained(0L).newer(4L).build());
+ soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(2L);
+ soft.assertThat(runInfo.identifiedObjs().orElse(-1))
+ .isEqualTo((1 << SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS) + 4L);
+ }
+
+ @Test
+ public void simpleRetainViaRealmIdentifier() {
+ var rOneObj1 =
+
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(),
ObjOne.class);
+ var rTwoObj2 =
+
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(),
ObjTwo.class);
+
+ persOne.createReference("ref1", Optional.empty());
+ persOne.createReference("ref2", Optional.empty());
+ persTwo.createReference("ref1", Optional.empty());
+ persTwo.createReference("ref2", Optional.empty());
+
+ // identify rOneObj1 as "live"
+ RealmIdentOne.testCallback =
+ c -> {
+ if (c.realm().equals(realmOne)) {
+ c.retainObject(objRef(rOneObj1));
+ c.retainReference("ref1");
+ }
+ return true;
+ };
+
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Run maintenance, provide realms, must purge unidentified
+ var runInfo =
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder()
+ .includeSystemRealm(false)
+ .realmsToProcess(Set.of(realmOne, realmTwo))
+ .build());
+
+ soft.assertThat(runInfo.referenceStats())
+
.contains(MaintenanceStats.builder().scanned(4L).purged(3L).retained(1L).newer(0L).build());
+ soft.assertThat(runInfo.objStats())
+
.contains(MaintenanceStats.builder().scanned(2L).purged(1L).retained(1L).newer(0L).build());
+ soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(1L);
+ soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(1L);
+
+ soft.assertThatCode(() ->
persOne.fetchReference("ref1")).doesNotThrowAnyException();
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persOne.fetchReference("ref2"));
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persTwo.fetchReference("ref1"));
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persTwo.fetchReference("ref2"));
+
+ soft.assertThat(persOne.fetch(objRef(rOneObj1),
ObjOne.class)).isEqualTo(rOneObj1);
+ soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+ }
+
+ @Test
+ public void simpleRetainViaRealmIdentifierPersistence() {
+ var rOneObj1 =
+
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(),
ObjOne.class);
+ var rTwoObj2 =
+
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(),
ObjTwo.class);
+
+ persOne.createReference("ref1", Optional.empty());
+ persOne.createReference("ref2", Optional.empty());
+ persTwo.createReference("ref1", Optional.empty());
+ persTwo.createReference("ref2", Optional.empty());
+
+ // identify rOneObj1 as "live"
+ RealmIdentOne.testCallback =
+ c -> {
+ if (c.realm().equals(realmOne)) {
+ c.realmPersistence().fetch(objRef(rOneObj1), ObjOne.class);
+ c.realmPersistence().fetchReference("ref1");
+ }
+ return true;
+ };
+
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Run maintenance, provide realms, must purge unidentified
+ var runInfo =
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder()
+ .includeSystemRealm(false)
+ .realmsToProcess(Set.of(realmOne, realmTwo))
+ .build());
+
+ soft.assertThat(runInfo.referenceStats())
+
.contains(MaintenanceStats.builder().scanned(4L).purged(3L).retained(1L).newer(0L).build());
+ soft.assertThat(runInfo.objStats())
+
.contains(MaintenanceStats.builder().scanned(2L).purged(1L).retained(1L).newer(0L).build());
+ soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(1L);
+ soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(1L);
+
+ soft.assertThatCode(() ->
persOne.fetchReference("ref1")).doesNotThrowAnyException();
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persOne.fetchReference("ref2"));
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persTwo.fetchReference("ref1"));
+ soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+ .isThrownBy(() -> persTwo.fetchReference("ref2"));
+
+ soft.assertThat(persOne.fetch(objRef(rOneObj1),
ObjOne.class)).isEqualTo(rOneObj1);
+ soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+ }
+
+ @Test
+ public void simpleRetainViaObjTypeIdentifier() {
+ var rOneObj1 =
+
persOne.write(ObjOne.builder().text("foo1").id(persOne.generateId()).build(),
ObjOne.class);
+ var rOneObj2 =
+
persOne.write(ObjTwo.builder().text("foo2").id(persOne.generateId()).build(),
ObjTwo.class);
+ var rTwoObj1 =
+
persTwo.write(ObjOne.builder().text("bar2").id(persTwo.generateId()).build(),
ObjOne.class);
+ var rTwoObj2 =
+
persTwo.write(ObjTwo.builder().text("bar2").id(persTwo.generateId()).build(),
ObjTwo.class);
+
+ // identify rOneObj1 as "live"
+ RealmIdentOne.testCallback =
+ c -> {
+ c.retainObject(objRef(rOneObj1));
+ return true;
+ };
+ // identify rObjObj2 via obj-type callback
+ ObjTypeIdentOne.testCallback =
+ (c, id) -> {
+ if (id.equals(objRef(rOneObj1))) {
+ c.retainObject(objRef(rOneObj2));
+ }
+ };
+ // identify rTwoObj1
+ RealmIdentTwo.testCallback =
+ c -> {
+ c.retainObject(objRef(rTwoObj1));
+ return true;
+ };
+
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Run maintenance, provide realms, must purge unidentified
+ var runInfo =
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder()
+ .includeSystemRealm(false)
+ .realmsToProcess(Set.of(realmOne, realmTwo))
+ .build());
+
+ soft.assertThat(runInfo.referenceStats())
+
.contains(MaintenanceStats.builder().scanned(0L).purged(0L).retained(0L).newer(0L).build());
+ soft.assertThat(runInfo.objStats())
+
.contains(MaintenanceStats.builder().scanned(4L).purged(1L).retained(3L).newer(0L).build());
+ soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+ soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(6L);
+
+ soft.assertThat(persOne.fetch(objRef(rOneObj1),
ObjOne.class)).isEqualTo(rOneObj1);
+ soft.assertThat(persOne.fetch(objRef(rOneObj2),
ObjTwo.class)).isEqualTo(rOneObj2);
+ soft.assertThat(persTwo.fetch(objRef(rTwoObj1),
ObjOne.class)).isEqualTo(rTwoObj1);
+ soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+ }
+
+ @Test
+ public void noRealmIdentifierHandlesRealms() {
+ var rOneObj1 =
+
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(),
ObjOne.class);
+ var rTwoObj2 =
+
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(),
ObjTwo.class);
+ persOne.createReference("ref1", Optional.empty());
+ persOne.createReference("ref2", Optional.empty());
+ persTwo.createReference("ref1", Optional.empty());
+ persTwo.createReference("ref2", Optional.empty());
+
+ RealmIdentOne.testCallback = c -> false;
+ RealmIdentTwo.testCallback = c -> false;
+
+ mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+ // Run maintenance, provide realms, no realm-identifier handles realm,
must NOT purge
+ var runInfo =
+ maintenance.performMaintenance(
+ MaintenanceRunSpec.builder()
+ .includeSystemRealm(false)
+ .realmsToProcess(Set.of(realmOne, realmTwo))
+ .build());
+
+ soft.assertThat(runInfo.referenceStats())
+
.contains(MaintenanceStats.builder().scanned(4L).purged(0L).retained(4L).newer(0L).build());
+ soft.assertThat(runInfo.objStats())
+
.contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build());
+ soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+ soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L);
+
+ soft.assertThat(persOne.fetch(objRef(rOneObj1),
ObjOne.class)).isEqualTo(rOneObj1);
+ soft.assertThat(persTwo.fetch(objRef(rTwoObj2),
ObjTwo.class)).isEqualTo(rTwoObj2);
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties
b/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties
new file mode 100644
index 000000000..c26169e0e
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# See https://bugs.openjdk.org/browse/JDK-8349545
+org.jboss.weld.bootstrap.concurrentDeployment=false
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java
new file mode 100644
index 000000000..3e283e955
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+
+@ApplicationScoped
+public class MaintenanceConfigurationProducer {
+ public static MutableMaintenanceConfig config = new
MutableMaintenanceConfig();
+
+ @Produces
+ MaintenanceConfig produceMaintenanceConfig() {
+ return config;
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java
new file mode 100644
index 000000000..d2bbc31ad
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static
org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceServiceImpl.MIN_GRACE_TIME_MINUTES;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+
+public class MutableMaintenanceConfig implements MaintenanceConfig {
+ /** Minimum allowed by MaintenanceServiceImpl. */
+ public static final Duration GRACE_TIME =
Duration.ofMinutes(MIN_GRACE_TIME_MINUTES);
+
+ private static MaintenanceConfig current =
MaintenanceConfig.builder().build();
+
+ public static void setCurrent(MaintenanceConfig config) {
+ current = config;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalLong expectedReferenceCount() {
+ return current.expectedReferenceCount();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalLong expectedObjCount() {
+ return current.expectedObjCount();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalDouble countFromLastRunMultiplier() {
+ return current.countFromLastRunMultiplier();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalDouble filterInitializedFpp() {
+ return current.filterInitializedFpp();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalDouble maxAcceptableFilterFpp() {
+ return current.maxAcceptableFilterFpp();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalInt retainedRuns() {
+ return current.retainedRuns();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @JsonFormat(shape = JsonFormat.Shape.STRING)
+ @Override
+ public Optional<Duration> createdAtGraceTime() {
+ return current.createdAtGraceTime();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalInt objectScanRateLimitPerSecond() {
+ return current.objectScanRateLimitPerSecond();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalInt referenceScanRateLimitPerSecond() {
+ return current.referenceScanRateLimitPerSecond();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @Override
+ public OptionalInt deleteBatchSize() {
+ return current.deleteBatchSize();
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java
new file mode 100644
index 000000000..72afcc076
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import jakarta.annotation.Nullable;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableObjOne.class)
+@JsonDeserialize(as = ImmutableObjOne.class)
+public interface ObjOne extends Obj {
+
+ ObjType TYPE = new ObjOneType();
+
+ @Override
+ default ObjType type() {
+ return TYPE;
+ }
+
+ @Nullable
+ String text();
+
+ static ImmutableObjOne.Builder builder() {
+ return ImmutableObjOne.builder();
+ }
+
+ final class ObjOneType extends AbstractObjType<ObjOne> {
+ public ObjOneType() {
+ super("maint-test-one", "maint-one", ObjOne.class);
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java
new file mode 100644
index 000000000..571e5d20c
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import jakarta.annotation.Nullable;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableObjTwo.class)
+@JsonDeserialize(as = ImmutableObjTwo.class)
+public interface ObjTwo extends Obj {
+
+ ObjType TYPE = new ObjTwoType();
+
+ @Override
+ default ObjType type() {
+ return TYPE;
+ }
+
+ @Nullable
+ String text();
+
+ static ImmutableObjTwo.Builder builder() {
+ return ImmutableObjTwo.builder();
+ }
+
+ final class ObjTwoType extends AbstractObjType<ObjTwo> {
+ public ObjTwoType() {
+ super("maint-test-two", "maint-two", ObjTwo.class);
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java
new file mode 100644
index 000000000..be9ef1cd3
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.BiConsumer;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class ObjTypeIdentOne implements ObjTypeRetainedIdentifier {
+
+ static BiConsumer<RetainedCollector, ObjRef> testCallback;
+
+ @Override
+ public String name() {
+ return "TEST ObjTypeRetainedIdentifier ONE";
+ }
+
+ @Nonnull
+ @Override
+ public ObjType handledObjType() {
+ return ObjOne.TYPE;
+ }
+
+ @Override
+ public void identifyRelatedObj(@Nonnull RetainedCollector collector,
@Nonnull ObjRef objRef) {
+ if (testCallback != null) {
+ testCallback.accept(collector, objRef);
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java
new file mode 100644
index 000000000..9052a3882
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.BiConsumer;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class ObjTypeIdentTwo implements ObjTypeRetainedIdentifier {
+ static BiConsumer<RetainedCollector, ObjRef> testCallback;
+
+ @Override
+ public String name() {
+ return "TEST ObjTypeRetainedIdentifier TWO";
+ }
+
+ @Nonnull
+ @Override
+ public ObjType handledObjType() {
+ return ObjTwo.TYPE;
+ }
+
+ @Override
+ public void identifyRelatedObj(@Nonnull RetainedCollector collector,
@Nonnull ObjRef objRef) {
+ if (testCallback != null) {
+ testCallback.accept(collector, objRef);
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java
new file mode 100644
index 000000000..229aa2ac4
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.Function;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class RealmIdentOne implements PerRealmRetainedIdentifier {
+ static Function<RetainedCollector, Boolean> testCallback;
+
+ @Override
+ public String name() {
+ return "TEST RealmRetainedIdentifier ONE";
+ }
+
+ @Override
+ public boolean identifyRetained(@Nonnull RetainedCollector collector) {
+ return testCallback != null ? testCallback.apply(collector) : false;
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java
new file mode 100644
index 000000000..22f2a61db
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.Function;
+import
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class RealmIdentTwo implements PerRealmRetainedIdentifier {
+ static Function<RetainedCollector, Boolean> testCallback;
+
+ @Override
+ public String name() {
+ return "TEST RealmRetainedIdentifier TWO";
+ }
+
+ @Override
+ public boolean identifyRetained(@Nonnull RetainedCollector collector) {
+ return testCallback != null ? testCallback.apply(collector) : false;
+ }
+}
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
new file mode 100644
index 000000000..b486d52fa
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.maintenance.impl;
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd">
+ <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
new file mode 100644
index 000000000..c2c5d5f95
--- /dev/null
+++
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.polaris.persistence.nosql.maintenance.impl.ObjOne$ObjOneType
+org.apache.polaris.persistence.nosql.maintenance.impl.ObjTwo$ObjTwoType