This is an automated email from the ASF dual-hosted git repository. joscorbe pushed a commit to branch old-revisions-cleanup in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit c0c98b7908979393a90ceb8afe9cc6f547f4cbaa Author: Jose Cordero <corde...@adobe.com> AuthorDate: Fri Jun 30 18:58:40 2023 +0200 Old revisions cleanup (WIP) --- .../jackrabbit/oak/console/GroovyConsole.groovy | 1 + .../oak/console/commands/RevisionCommand.groovy | 289 ++++++++++++++++ .../console/commands/RevisionCommand.properties | 21 ++ .../jackrabbit/oak/run/RevisionsCommand.java | 150 ++++++++- oak-store-document/pom.xml | 6 + .../document/DocumentRevisionCleanupHelper.java | 276 ++++++++++++++++ .../DocumentRevisionCleanupHelperTest.java | 366 +++++++++++++++++++++ 7 files changed, 1108 insertions(+), 1 deletion(-) diff --git a/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/GroovyConsole.groovy b/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/GroovyConsole.groovy index 904ab15a60..6124cc8a16 100644 --- a/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/GroovyConsole.groovy +++ b/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/GroovyConsole.groovy @@ -124,6 +124,7 @@ class GroovyConsole { new CheckpointCommand(shell), new LsCommand(shell), new PnCommand(shell), + new RevisionCommand(shell), new RefreshCommand(shell), new RetrieveCommand(shell), new LuceneCommand(shell), diff --git a/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/commands/RevisionCommand.groovy b/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/commands/RevisionCommand.groovy new file mode 100644 index 0000000000..8e055d17a9 --- /dev/null +++ b/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/commands/RevisionCommand.groovy @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.console.commands + + +import groovy.transform.CompileStatic +import org.apache.jackrabbit.oak.console.ConsoleSession +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore +import org.apache.jackrabbit.oak.plugins.document.DocumentRevisionCleanupHelper +import org.apache.jackrabbit.oak.plugins.document.DocumentStore +import org.apache.jackrabbit.oak.plugins.document.Revision +import org.apache.jackrabbit.oak.spi.state.NodeStore +import org.codehaus.groovy.tools.shell.CommandSupport +import org.codehaus.groovy.tools.shell.Groovysh + +import java.util.stream.Collectors + +import static java.lang.Integer.getInteger + +@CompileStatic +class RevisionCommand extends CommandSupport { + public static final String COMMAND_NAME = 'revision' + + private static final int REVISION_CAP = getInteger("oak.revision.cap", 250) + + private static final String ARGS_SUBCOMMAND_LIST = "list" + private static final String ARGS_SUBCOMMAND_INIT = "init" + private static final String ARGS_SUBCOMMAND_SHOW = "show" + private static final String ARGS_SUBCOMMAND_CLEANUP = "cleanup" + + private static final String ARGS_NO_CAP_RESULTS = "--nocap" + private static final String ARGS_NO_CAP_RESULTS_SHORT = "-n" + private static final String ARGS_ORDER_DESC = "--desc" + private static final String ARGS_ORDER_DESC_SHORT = "-d" + + private static final int ORDER_ASC = 0 + private static final int ORDER_DESC = 1 + + private static int order = ORDER_ASC + private static boolean capResults = true + private static boolean showPropertiesUse = false + + private DocumentRevisionCleanupHelper cleanupHelper; + + RevisionCommand(Groovysh shell) { + super(shell, COMMAND_NAME, 'rev') + } + + @Override + Object execute(List<String> args) { + if (args.isEmpty()) { + io.err.println("Missing subcommand: [<list>|<init>|<show>|<cleanup>]") + return null + } + + parseArgs(args) + switch(args[0].toLowerCase()) { + case ARGS_SUBCOMMAND_LIST: + list() + break + case ARGS_SUBCOMMAND_INIT: + initialize() + break + case ARGS_SUBCOMMAND_SHOW: + show(args.contains("candidates"), args.contains("blocked")) + break + case ARGS_SUBCOMMAND_CLEANUP: + io.err.println("This command is read-only. To execute the cleanup, use 'oak-run revisions' command:") + io.err.println(" oak-run revisions {<jdbc-uri> | <mongodb-uri>} cleanup [options]") + io.err.println(" Options:") + io.err.println(" --path <path> Path to the node to cleanup. This is required.") + io.err.println(" --number <number> Number of revisions to cleanup") + io.err.println(" --clusterId <clusterId> ClusterId to cleanup") + io.err.println(" Example:") + io.err.println(" oak-run revisions {<jdbc-uri> | <mongodb-uri>} cleanup --path /content --number 10 --clusterId 1") + io.err.println() + break + default: + io.err.println("Unrecognized subcommand: " + args[0]) + } + + io.out.flush() + return null + } + + /** + * List command. + * Prints the list of revisions for the working document. + */ + void list() { + int count = 0 + NavigableMap<Revision, String> allRevisions = cleanupHelper.getAllRevisions(); + + for (Map.Entry<Revision, String> revisionEntry : order == ORDER_ASC ? allRevisions.entrySet() : allRevisions.descendingMap().entrySet()) { + Revision revision = revisionEntry.getKey() + String value = revisionEntry.getValue() + io.out.println(revision.toReadableString() + " " + value) + count++ + if (capResults && count >= REVISION_CAP) { + io.out.printf("-- Reached revision cap of %d elements. This document has %d revisions --%n", REVISION_CAP, allRevisions.size()) + break + } + } + } + + /** + * Outputs the candidate and blocked revision sets. These sets are populated during a previous execution of + * the info() command. + * @param args + */ + void show(boolean candidates, boolean blocked) { + if (candidates) { + SortedMap<Integer, TreeSet<Revision>> candidateRevisionsToClean = cleanupHelper.getCandidateRevisionsToClean() + if (candidateRevisionsToClean.isEmpty()) { + io.out.println("-- Candidates revision list is empty. Run 'rev init' before --") + } + for (Map.Entry<Integer, TreeSet<Revision>> clusterEntry : candidateRevisionsToClean) { + int count = 0 + io.out.println("ClusterId " + clusterEntry.key) + for (Revision revision : order == ORDER_ASC ? clusterEntry.value : clusterEntry.value.descendingSet()) { + io.out.println(" " + revision.toReadableString()) + count++ + if (capResults && count >= REVISION_CAP) { + io.out.printf("-- Reached revision cap of %d elements. There are %d revisions in the list --%n", REVISION_CAP, clusterEntry.value.size()) + break + } + } + } + } + + if (blocked) { + SortedMap<Integer, TreeSet<Revision>> usedRevisionsToKeep = cleanupHelper.getBlockedRevisionsToKeep() + if (cleanupHelper.getBlockedRevisionsToKeep().isEmpty()) { + io.out.println("-- Blocked revision list is empty. Run 'rev init' before --") + } + for (Map.Entry<Integer, TreeSet<Revision>> clusterEntry : usedRevisionsToKeep) { + int count = 0 + io.out.println("ClusterId " + clusterEntry.key) + for (Revision revision : order == ORDER_ASC ? clusterEntry.value : clusterEntry.value.descendingSet()) { + io.out.println(revision.toReadableString()) + count++ + if (capResults && count >= REVISION_CAP) { + io.out.printf("-- Reached revision cap of %d elements. There are %d revisions in the list --%n", REVISION_CAP, clusterEntry.value.size()) + break + } + } + } + } + } + + /** + * Performs the cleanup of a certain number of revisions for the specified clusterId, starting from oldest. + * @param numberToCleanup + * @param clusterToCleanup + */ + void cleanup(int numberToCleanup, int clusterToCleanup) { + io.out.println("This will delete the following revisions and the property values permanently:") + SortedMap<Integer, TreeSet<Revision>> candidateRevisionsToClean = cleanupHelper.getCandidateRevisionsToClean() + SortedMap<Revision, TreeSet<String>> propertiesModifiedByRevision = cleanupHelper.getPropertiesModifiedByRevision() + TreeSet<Revision> revisions = candidateRevisionsToClean.get(clusterToCleanup) + if (revisions != null) { + int count = 0 + for (Revision revision : revisions) { + io.out.println(revision.toReadableString() + " => " + propertiesModifiedByRevision.get(revision)) + count++ + if (count >= numberToCleanup) { + break + } + } + } else { + io.err.println("Invalid clusterId") + return + } + + io.out.println("Are you sure to proceed? [y/N]") + int confirmation = io.in.read() + io.in.readLine() + if (confirmation == (int)('y' as char)) { + // Start the cleanup + int count = 0 + for (Revision revision : candidateRevisionsToClean.get(clusterToCleanup)) { + // Remove the revision from _revisions and all the properties + /*UpdateOp update = new UpdateOp(workingDocument.path.toString(), false) + update.removeMapEntry("_revisions", revision) + for (String property : propertiesModifiedByRevision.get(revision)) { + update.removeMapEntry(property, revision) + } + io.out.println("Executing UpdateOp: " + update) + try { + documentStore.findAndUpdate(NODES, update) + } catch (DocumentStoreException ex) { + io.out.println("Operation failed: " + ex) + }*/ + + count++ + if (count >= numberToCleanup) { + break + } + } + io.out.println("-- Executed " + count + " operations --") + } + } + + void initialize() { + NodeStore nodeStore = getSession().getStore() + assert nodeStore instanceof DocumentNodeStore + DocumentStore documentStore = nodeStore.getDocumentStore() + DocumentNodeStore documentNodeStore = (DocumentNodeStore) nodeStore + + cleanupHelper = new DocumentRevisionCleanupHelper(documentStore, documentNodeStore, session.getWorkingPath()) + + cleanupHelper.initializeCleanupProcess() + + SortedMap<Revision, String> allRevisions = cleanupHelper.getAllRevisions() + TreeMap<Integer, Integer> revisionsByClusterId = allRevisions.keySet().groupBy { it.clusterId } + .collectEntries { clusterId, revisions -> + [clusterId, revisions.size()] + } as TreeMap<Integer, Integer> + + io.out.println("=== Last Revision by clusterId ===") + for (Map.Entry<Integer, Revision> entry : cleanupHelper.getLastRev()) { + io.out.printf(" [%d] -> %s%n", entry.key, entry.value.toReadableString()) + } + io.out.println() + + io.out.println("=== Sweep Revision by clusterId ===") + for (Map.Entry<Integer, Revision> entry : cleanupHelper.getSweepRev()) { + io.out.printf(" [%d] -> %s%n", entry.key, entry.value.toReadableString()) + } + io.out.println() + + io.out.println("=== Total Revisions by clusterId ===") + for (Map.Entry<Integer, Integer> entry : revisionsByClusterId) { + io.out.printf(" [%d] -> %d revisions%n", entry.key, entry.value) + } + io.out.println() + + io.out.println("=== Candidates to cleanup ===") + cleanupHelper.getCandidateRevisionsToClean().each { clusterId, revisions -> + int blocked = cleanupHelper.getBlockedRevisionsToKeep().get(clusterId)?.size() ?: 0 + io.out.printf("ClusterId [%d] has %d Candidates and %d Blocked%n", clusterId, revisions.size(), blocked) + } + + io.out.println("The lists are stored temporarily. You can print them using:") + io.out.println(" > rev show candidates") + io.out.println(" > rev show blocked") + io.out.println() + } + + void parseArgs(List<String> args) { + // Defaults + order = ORDER_ASC + capResults = true + showPropertiesUse = false + + for (String arg : args) { + switch(arg) { + case ARGS_NO_CAP_RESULTS: + case ARGS_NO_CAP_RESULTS_SHORT: + capResults = false + break + case ARGS_ORDER_DESC: + case ARGS_ORDER_DESC_SHORT: + order = ORDER_DESC + break + } + } + } + + ConsoleSession getSession(){ + return (ConsoleSession)variables.session + } +} diff --git a/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/commands/RevisionCommand.properties b/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/commands/RevisionCommand.properties new file mode 100644 index 0000000000..5bc67c2261 --- /dev/null +++ b/oak-run/src/main/groovy/org/apache/jackrabbit/oak/console/commands/RevisionCommand.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +command.description=Revision management. +command.usage=[<list>|<show>|<init>|<cleanup>] +command.help=Revision management. diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RevisionsCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RevisionsCommand.java index 21e5aff706..9775dacc63 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RevisionsCommand.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RevisionsCommand.java @@ -23,6 +23,11 @@ import com.google.common.io.Closer; import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Scanner; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -32,6 +37,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.LoggerContext; @@ -41,9 +47,11 @@ import org.apache.jackrabbit.oak.commons.TimeDurationFormatter; import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder; +import org.apache.jackrabbit.oak.plugins.document.DocumentRevisionCleanupHelper; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.FormatVersion; import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker; +import org.apache.jackrabbit.oak.plugins.document.Revision; import org.apache.jackrabbit.oak.plugins.document.RevisionContextWrapper; import org.apache.jackrabbit.oak.plugins.document.SweepHelper; import org.apache.jackrabbit.oak.plugins.document.VersionGCSupport; @@ -56,6 +64,7 @@ import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.Integer.getInteger; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreHelper.createVersionGC; import static org.apache.jackrabbit.oak.plugins.document.FormatVersion.versionOf; @@ -71,6 +80,8 @@ public class RevisionsCommand implements Command { private static final Logger LOG = LoggerFactory.getLogger(RevisionsCommand.class); + private static final int REVISION_CAP = getInteger("oak.revision.cap", 250); + private static final String USAGE = Joiner.on(System.lineSeparator()).join( "revisions {<jdbc-uri> | <mongodb-uri>} <sub-command> [options]", "where sub-command is one of", @@ -78,7 +89,8 @@ public class RevisionsCommand implements Command { " any modifications", " collect perform garbage collection", " reset clear all persisted metadata", - " sweep clean up uncommitted changes" + " sweep clean up uncommitted changes", + " cleanup clean up old/unused revisions" ); private static final ImmutableList<String> LOGGER_NAMES = ImmutableList.of( @@ -92,6 +104,7 @@ public class RevisionsCommand implements Command { static final String CMD_COLLECT = "collect"; static final String CMD_RESET = "reset"; static final String CMD_SWEEP = "sweep"; + static final String CMD_CLEANUP = "cleanup"; final OptionSpec<?> once; final OptionSpec<Integer> limit; @@ -100,6 +113,8 @@ public class RevisionsCommand implements Command { final OptionSpec<Double> delay; final OptionSpec<?> continuous; final OptionSpec<?> verbose; + final OptionSpec<String> path; + final OptionSpec<Integer> number; RevisionsOptions(String usage) { super(usage); @@ -120,6 +135,11 @@ public class RevisionsCommand implements Command { .accepts("continuous", "run continuously (collect only)"); verbose = parser .accepts("verbose", "print INFO messages to the console"); + path = parser + .accepts("path", "path to the document to be cleaned up").withRequiredArg(); + number = parser + .accepts("number", "number of revisions to clean").withRequiredArg() + .ofType(Integer.class).defaultsTo(0); } public RevisionsOptions parse(String[] args) { @@ -162,6 +182,14 @@ public class RevisionsCommand implements Command { boolean isVerbose() { return options.has(verbose); } + + String getPath() { + return path.value(options); + } + + int getNumber() { + return number.value(options); + } } @Override @@ -180,6 +208,8 @@ public class RevisionsCommand implements Command { reset(options, closer); } else if (RevisionsOptions.CMD_SWEEP.equals(subCmd)) { sweep(options, closer); + } else if (RevisionsOptions.CMD_CLEANUP.equals(subCmd)) { + cleanup(options, closer); } else { System.err.println("unknown revisions command: " + subCmd); } @@ -394,6 +424,124 @@ public class RevisionsCommand implements Command { SweepHelper.sweep(store, new RevisionContextWrapper(ns, clusterId), seeker); } + private void cleanup(RevisionsOptions options, Closer closer) throws IOException { + String path = options.getPath(); + if (path == null || path.isEmpty()) { + System.err.println("path option is required for " + RevisionsOptions.CMD_CLEANUP + " command"); + return; + } + Integer clusterToCleanup = options.getClusterId(); + if (clusterToCleanup == null) { + System.err.println("clusterId option is required for " + RevisionsOptions.CMD_CLEANUP + " command"); + return; + } + Integer numberToCleanup = options.getNumber(); + if (numberToCleanup == null) { + System.err.println("number option is required for " + RevisionsOptions.CMD_CLEANUP + " command"); + return; + } + + DocumentNodeStoreBuilder<?> builder = createDocumentMKBuilder(options, closer); + if (builder == null) { + System.err.println("revisions mode only available for DocumentNodeStore"); + return; + } + + DocumentStore documentStore = builder.getDocumentStore(); + builder.setReadOnlyMode(); + useMemoryBlobStore(builder); + DocumentNodeStore documentNodeStore = builder.build(); + DocumentRevisionCleanupHelper cleanupHelper = new DocumentRevisionCleanupHelper(documentStore, documentNodeStore, path); + cleanupHelper.initializeCleanupProcess(); + + SortedMap<Revision, String> allRevisions = cleanupHelper.getAllRevisions(); + SortedMap<Integer, Integer> revisionsByClusterId = new TreeMap<>(); + for (Revision revision : allRevisions.keySet()) { + Integer cid = revision.getClusterId(); + Integer count = revisionsByClusterId.get(cid); + if (count == null) { + count = 0; + } + revisionsByClusterId.put(cid, count + 1); + } + + System.out.println("Last Revision by clusterId"); + for (Map.Entry<Integer, Revision> entry : cleanupHelper.getLastRev().entrySet()) { + System.out.printf(" [%d] -> %s%n", entry.getKey(), entry.getValue().toReadableString()); + } + + System.out.println("Sweep Revision by clusterId"); + for (Map.Entry<Integer, Revision> entry : cleanupHelper.getSweepRev().entrySet()) { + System.out.printf(" [%d] -> %s%n", entry.getKey(), entry.getValue().toReadableString()); + } + System.out.println(); + + int count = 0; + System.out.println("=== Total Revisions by clusterId ==="); + for (Map.Entry<Integer, Integer> entry : revisionsByClusterId.entrySet()) { + System.out.printf(" [%d] -> %d revisions%n", entry.getKey(), entry.getValue()); + count++; + if (count >= REVISION_CAP) { + System.out.printf(" ...%n"); + break; + } + } + + for (Map.Entry<Integer, TreeSet<Revision>> entry : cleanupHelper.getCandidateRevisionsToClean().entrySet()) { + Integer cid = entry.getKey(); + TreeSet<Revision> revisions = entry.getValue(); + int blocked = cleanupHelper.getBlockedRevisionsToKeep().get(cid) != null ? cleanupHelper.getBlockedRevisionsToKeep().get(cid).size() : 0; + System.out.printf("ClusterId [%d] has %d Candidates and %d Blocked%n", cid, revisions.size(), blocked); + } + + System.out.println("=== Revisions to be cleaned for clusterId " + clusterToCleanup + " ==="); + TreeSet<Revision> revisionsToClean = cleanupHelper.getCandidateRevisionsToClean().get(clusterToCleanup) + .stream().limit(numberToCleanup).collect(Collectors.toCollection(TreeSet::new)); + if (revisionsToClean.isEmpty()) { + System.out.println("No revisions to clean"); + return; + } else { + count = 0; + for (Revision revision : revisionsToClean) { + System.out.printf(" %s%n", revision.toReadableString()); + count++; + if (count >= REVISION_CAP) { + System.out.print(" ...%n"); + System.out.printf(" %s%n", revisionsToClean.last().toReadableString()); + break; + } + } + } + + Scanner scanner = new Scanner(System.in); + System.out.println("The revisions will be deleted permanently. Are you sure to proceed? [y/N]"); + String confirmation = scanner.nextLine().trim().toLowerCase(); + if (confirmation.equals("y") || confirmation.equals("yes")) { + // Start the cleanup + count = 0; + for (Revision revision : cleanupHelper.getCandidateRevisionsToClean().get(clusterToCleanup)) { + // Remove the revision from _revisions and all the properties + /*UpdateOp update = new UpdateOp(workingDocument.path.toString(), false) + update.removeMapEntry("_revisions", revision) + for (String property : propertiesModifiedByRevision.get(revision)) { + update.removeMapEntry(property, revision) + } + io.out.println("Executing UpdateOp: " + update) + try { + documentStore.findAndUpdate(NODES, update) + } catch (DocumentStoreException ex) { + io.out.println("Operation failed: " + ex) + }*/ + + count++; + if (count >= numberToCleanup) { + break; + } + } + System.out.println("-- Executed " + count + " operations --"); + } + } + private String fmtTimestamp(long ts) { return timestampToString(ts); } diff --git a/oak-store-document/pom.xml b/oak-store-document/pom.xml index 5063896026..5b582f457b 100644 --- a/oak-store-document/pom.xml +++ b/oak-store-document/pom.xml @@ -346,5 +346,11 @@ <version>${testcontainers.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRevisionCleanupHelper.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRevisionCleanupHelper.java new file mode 100644 index 0000000000..1cefd6f8f1 --- /dev/null +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRevisionCleanupHelper.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import com.google.common.collect.Maps; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.NavigableMap; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; +import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry; + +public class DocumentRevisionCleanupHelper { + + private final DocumentStore documentStore; + private final DocumentNodeStore documentNodeStore; + private final NodeDocument rootDoc; + private final NodeDocument workingDocument; + + private SortedMap<String, SortedMap<Integer, TreeSet<Revision>>> revisionsModifyingProperty; + private SortedMap<Revision, TreeSet<String>> propertiesModifiedByRevision; + private SortedMap<Integer, TreeSet<Revision>> blockedRevisionsToKeep; + private SortedMap<Integer, TreeSet<Revision>> candidateRevisionsToClean; + + /** + * Constructor for DocumentRevisionCleanupHelper. + * @param documentStore The DocumentStore instance. Must be writable to perform cleanup. + * @param documentNodeStore The DocumentNodeStore instance. + * @param path The path of the document to clean up. + */ + public DocumentRevisionCleanupHelper(DocumentStore documentStore, DocumentNodeStore documentNodeStore, String path) { + this.candidateRevisionsToClean = new TreeMap<>(); + this.blockedRevisionsToKeep = new TreeMap<>(); + this.revisionsModifyingProperty = new TreeMap<>(); + this.propertiesModifiedByRevision = new TreeMap<>(StableRevisionComparator.INSTANCE); + + this.rootDoc = documentStore.find(NODES, Utils.getIdFromPath("/")); + String id = Utils.getIdFromPath(path); + this.workingDocument = documentStore.find(NODES, id); + this.documentStore = documentStore; + this.documentNodeStore = documentNodeStore; + } + + /** + * Performs the full revision cleanup process for the given document for a clusterId. + */ + public void initializeCleanupProcess() { + classifyAndMapRevisionsAndProperties(); + markRevisionsNewerThanThresholdToPreserve(24, ChronoUnit.HOURS); + markLastRevisionForEachProperty(); + markCheckpointRevisionsToPreserve(); + removeCandidatesInList(blockedRevisionsToKeep); + } + + public int executeCleanupProcess(int numberToCleanup, int clusterToCleanup) { + return -99; + } + + /** + * Step 1: + * This method processes the revisions of the working document, classifying them into two categories: + * candidate revisions that can be cleaned up and used revisions that should be kept. It also creates maps to + * track the relationships between revisions and properties modified by them. + */ + protected void classifyAndMapRevisionsAndProperties() { + candidateRevisionsToClean = new TreeMap<>(); + blockedRevisionsToKeep = new TreeMap<>(); + revisionsModifyingProperty = new TreeMap<>(); + propertiesModifiedByRevision = new TreeMap<>(StableRevisionComparator.INSTANCE); + + // The first entry in "_deleted" has to be kept, as is when the document was created + NavigableMap<Revision, String> deletedRevisions = ((NavigableMap<Revision, String>) workingDocument.get("_deleted")); + if (deletedRevisions != null && !deletedRevisions.isEmpty()) { + Revision createdRevision = deletedRevisions.descendingMap().lastKey(); + addCandidateRevisionToClean(createdRevision); + addBlockedRevisionToKeep(createdRevision); + } + + SortedMap<Revision, String> documentRevisions = getAllRevisions(); + for (Map.Entry<Revision, String> revisionEntry : documentRevisions.entrySet()) { + Revision revision = revisionEntry.getKey(); + String revisionValue = revisionEntry.getValue(); + + // Only check committed revisions (ignore branch commits starting with "c-") + if ("c".equals(revisionValue)) { + // Candidate to clean up + addCandidateRevisionToClean(revision); + // Store properties usage + mapPropertiesModifiedByThisRevision(revision); + } + } + } + + /** + * Step 2: + * This method filters out revisions that are newer than a specified time threshold (specified by amount and unit) + * from the candidate revisions to be cleaned up, marking them as used revisions that should be kept. + * @param amount the amount of time + * @param unit the unit of time + */ + protected void markRevisionsNewerThanThresholdToPreserve(long amount, ChronoUnit unit) { + long twentyFiveHoursAgoMillis = Instant.now().minus(amount, unit).toEpochMilli(); + for (TreeSet<Revision> revisionSet : candidateRevisionsToClean.values()) { + for (Revision revision : revisionSet) { + if (revision.getTimestamp() > twentyFiveHoursAgoMillis) { + addBlockedRevisionToKeep(revision); + } + } + } + } + + /** + * Step 3: + * This method processes a set of revisions that modified certain properties and keeps the last revision that + * modified each property. This means, the current status of the node will be preserved. + */ + protected void markLastRevisionForEachProperty() { + for (SortedMap<Integer, TreeSet<Revision>> revisionsByCluster : revisionsModifyingProperty.values()) { + for (TreeSet<Revision> revisions : revisionsByCluster.values()) { + Revision lastRevision = revisions.last(); + addBlockedRevisionToKeep(lastRevision); + } + } + } + + /** + * Step 4: + * Process a set of revisions that modified certain properties and determine which revisions should be + * kept based on the checkpoints. + */ + protected void markCheckpointRevisionsToPreserve() { + SortedMap<Revision, Checkpoints.Info> checkpoints = documentNodeStore.getCheckpoints().getCheckpoints(); + checkpoints.forEach((revision, info) -> { + // For each checkpoint, keep the last revision that modified a property prior to checkpoint + revisionsModifyingProperty.forEach((propertyName, revisionsByCluster) -> { + // Traverse the revisionVector of the checkpoint and find the last revision that modified the property + info.getCheckpoint().forEach(revisionToFind -> { + TreeSet<Revision> listOfRevisionsForProperty = revisionsByCluster.get(revisionToFind.getClusterId()); + if (listOfRevisionsForProperty != null) { + // If the exact revision exists, keep it. If not, find the previous one that modified that property + if (listOfRevisionsForProperty.contains(revisionToFind)) { + addBlockedRevisionToKeep(revisionToFind); + } else { + Revision previousRevision = listOfRevisionsForProperty.descendingSet().ceiling(revisionToFind); + if (previousRevision != null) { + addBlockedRevisionToKeep(previousRevision); + } + } + } + }); + }); + }); + } + + /** + * Step 5: + * Removes for each clusterId the revisions in the Map, that were blocked in the methods above. + */ + private void removeCandidatesInList(SortedMap<Integer, TreeSet<Revision>> revisions) { + revisions.forEach((key, value) -> { + if (candidateRevisionsToClean.containsKey(key)) { + candidateRevisionsToClean.get(key).removeAll(value); + } + }); + } + + /** + * This method processes a given revision and identify the properties modified by it within the working document. + * It maintains two data structures, propertiesModifiedByRevision and revisionsModifyingProperty, to store + * the properties and their associated revisions. + * @param revision + */ + private void mapPropertiesModifiedByThisRevision(Revision revision) { + for (Map.Entry<String, Object> propertyEntry : workingDocument.entrySet()) { + if (Utils.isPropertyName(propertyEntry.getKey()) || isDeletedEntry(propertyEntry.getKey())) { + Map<Revision, String> valueMap = (Map) propertyEntry.getValue(); + if (valueMap.containsKey(revision)) { + propertiesModifiedByRevision.computeIfAbsent(revision, key -> + new TreeSet<>()).add(propertyEntry.getKey() + ); + + revisionsModifyingProperty.computeIfAbsent(propertyEntry.getKey(), key -> + new TreeMap<>() + ).computeIfAbsent(revision.getClusterId(), key -> + new TreeSet<>(StableRevisionComparator.INSTANCE) + ).add(revision); + } + } + } + } + + /** + * Adds a revision to the list of candidates to delete. + * @param revision + */ + private void addCandidateRevisionToClean(Revision revision) { + candidateRevisionsToClean.computeIfAbsent(revision.getClusterId(), key -> + new TreeSet<>(StableRevisionComparator.INSTANCE) + ).add(revision); + } + + /** + * Adds a revision to the list of revisions to keep. + * @param revision + */ + private void addBlockedRevisionToKeep(Revision revision) { + blockedRevisionsToKeep.computeIfAbsent(revision.getClusterId(), key -> + new TreeSet<>(StableRevisionComparator.INSTANCE) + ).add(revision); + } + + /** + * Returns the LastRev map from the root document. + * @return + */ + public Map<Integer, Revision> getLastRev() { + return rootDoc.getLastRev(); + } + + /** + * Returns the SweepRev map from the root document. + * @return + */ + public Map<Integer, Revision> getSweepRev() { + Map<Integer, Revision> map = Maps.newHashMap(); + Map<Revision, String> valueMap = (SortedMap<Revision, String>) rootDoc.get("_sweepRev"); + for (Map.Entry<Revision, String> e : valueMap.entrySet()) { + int clusterId = e.getKey().getClusterId(); + Revision rev = Revision.fromString(e.getValue()); + map.put(clusterId, rev); + } + return map; + } + + public NavigableMap<Revision, String> getAllRevisions() { + return (NavigableMap<Revision, String>) workingDocument.get("_revisions"); + } + + public SortedMap<String, SortedMap<Integer, TreeSet<Revision>>> getRevisionsModifyingProperty() { + return revisionsModifyingProperty; + } + + public SortedMap<Revision, TreeSet<String>> getPropertiesModifiedByRevision() { + return propertiesModifiedByRevision; + } + + public SortedMap<Integer, TreeSet<Revision>> getBlockedRevisionsToKeep() { + return blockedRevisionsToKeep; + } + + public SortedMap<Integer, TreeSet<Revision>> getCandidateRevisionsToClean() { + return candidateRevisionsToClean; + } +} diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentRevisionCleanupHelperTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentRevisionCleanupHelperTest.java new file mode 100644 index 0000000000..01daf58358 --- /dev/null +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentRevisionCleanupHelperTest.java @@ -0,0 +1,366 @@ +package org.apache.jackrabbit.oak.plugins.document; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Maps; +import org.mockito.Mockito; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; +import static org.apache.jackrabbit.oak.plugins.document.Collection.SETTINGS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class DocumentRevisionCleanupHelperTest { + + @Mock + DocumentStore documentStore; + + @Mock + DocumentNodeStore documentNodeStore; + + @Mock + Checkpoints checkpoints; + + @Mock + NodeDocument workingDocument; + + DocumentRevisionCleanupHelper documentRevisionCleanupHelper; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + workingDocument = Mockito.mock(NodeDocument.class); + + Mockito.when(documentStore.find(Mockito.eq(NODES), Mockito.anyString())).thenReturn(workingDocument); + documentRevisionCleanupHelper = new DocumentRevisionCleanupHelper(documentStore, documentNodeStore, "/"); + } + + @Test + public void testMarkCheckpointRevisionsToPreserveOnePropertyOneCluster() throws IOException { + Revision revisionA = Revision.fromString("r100000000-0-1"); + Revision revisionB = Revision.fromString("r105000000-0-1"); + Revision revisionC = Revision.fromString("r110000000-0-1"); + Revision revisionD = Revision.fromString("r115000000-0-1"); + Revision revisionE = Revision.fromString("r120000000-0-1"); + Revision revisionF = Revision.fromString("r125000000-0-1"); + + Revision checkpoint1 = Revision.fromString("r109000000-0-1"); + Revision checkpoint2 = Revision.fromString("r119000000-0-1"); + + String jsonProperties = "{" + + "'prop1': {'" + revisionA + "': 'value1', '" + revisionB + "': 'value2', '" + revisionC + "': 'value3', '" + revisionD + "': 'value4', '" + revisionE + "': 'value5', '" + revisionF + "': 'value6'}, " + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c', '" + revisionC + "': 'c', '" + revisionD + "': 'c', '" + revisionE + "': 'c', '" + revisionF + "': 'c'}" + + "}"; + String jsonCheckpoints = "{" + + "'" + checkpoint1 + "': {'expires':'200000000','rv':'r109000000-0-1'}," + + "'" + checkpoint2 + "': {'expires':'200000000','rv':'r119000000-0-1'}" + + "}"; + + + prepareDocumentMock(jsonProperties); + prepareCheckpointsMock(jsonCheckpoints); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + documentRevisionCleanupHelper.markCheckpointRevisionsToPreserve(); + + // The revisions blocked should be: + // - r105000000-0-1 (blocked by checkpoint r109000000-0-1) + // - r115000000-0-1 (blocked by checkpoint r119000000-0-1) + + assertEquals(Set.of(revisionB, revisionD), documentRevisionCleanupHelper.getBlockedRevisionsToKeep().get(1)); + + + /*assertEquals(Set.of(revisionC), documentRevisionCleanupHelper.getBlockedRevisionsToKeep().get(1)); + assertEquals(Set.of(revisionA, revisionB, revisionC), documentRevisionCleanupHelper.getCandidateRevisionsToClean().get(1));*/ + } + + @Test + public void testInitializeCleanupProcessMultipleClusters() throws IOException { + Revision revisionA = Revision.fromString("r100000000-0-1"); + Revision revisionB = Revision.fromString("r105000000-0-2"); + Revision revisionC = Revision.fromString("r110000000-0-3"); + Revision revisionD = Revision.fromString("r115000000-0-1"); + Revision revisionE = Revision.fromString("r120000000-0-2"); + Revision revisionF = Revision.fromString("r125000000-0-3"); + + Revision checkpoint1 = Revision.fromString("r109000000-0-1"); + Revision checkpoint2 = Revision.fromString("r119000000-0-1"); + + String jsonProperties = "{" + + "'prop1': {'" + revisionA + "': 'value1', '" + revisionB + "': 'value2', '" + revisionC + "': 'value3', '" + revisionD + "': 'value4', '" + revisionE + "': 'value5', '" + revisionF + "': 'value6'}, " + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c', '" + revisionC + "': 'c', '" + revisionD + "': 'c', '" + revisionE + "': 'c', '" + revisionF + "': 'c'}" + + "}"; + String jsonCheckpoints = "{" + + "'" + checkpoint1 + "': {'expires':'200000000','rv':'r109000000-0-1,r109000000-0-1'}," + + "'" + checkpoint2 + "': {'expires':'200000000','rv':'r119000000-0-1,r109000000-0-1'}" + + "}"; + + + prepareDocumentMock(jsonProperties); + prepareCheckpointsMock(jsonCheckpoints); + + documentRevisionCleanupHelper.initializeCleanupProcess(); + + // The revisions blocked should be: + // + + assertEquals(Set.of(revisionC), documentRevisionCleanupHelper.getBlockedRevisionsToKeep().get(1)); + assertEquals(Set.of(revisionA, revisionB, revisionC), documentRevisionCleanupHelper.getCandidateRevisionsToClean().get(1)); + } + + @Test + public void testLastRevisionIsBlocked() throws IOException { + Revision revisionA = new Revision(111111111L, 0, 1); + Revision revisionB = new Revision(222222222L, 0, 1); + Revision revisionC = new Revision(333333333L, 0, 1); + + String jsonProperties = "{" + + "'prop1': {'" + revisionA + "': 'value1', '" + revisionB + "': 'value2', '" + revisionC + "': 'value3'}, " + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c'}" + + "}"; + prepareDocumentMock(jsonProperties); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + documentRevisionCleanupHelper.markLastRevisionForEachProperty(); + + assertFalse(documentRevisionCleanupHelper.getBlockedRevisionsToKeep().get(1).contains(revisionA)); + assertFalse(documentRevisionCleanupHelper.getBlockedRevisionsToKeep().get(1).contains(revisionB)); + assertEquals(Set.of(revisionC), documentRevisionCleanupHelper.getBlockedRevisionsToKeep().get(1)); + assertEquals(Set.of(revisionA, revisionB, revisionC), documentRevisionCleanupHelper.getCandidateRevisionsToClean().get(1)); + } + + @Test + public void testFirstDeletedRevisionIsBlocked() throws IOException { + Revision revisionA = new Revision(111111111L, 0, 1); + Revision revisionB = new Revision(222222222L, 0, 1); + Revision revisionC = new Revision(333333333L, 0, 1); + + String jsonProperties = "{" + + "'_deleted': {'" + revisionA + "': 'false', '" + revisionB + "': 'true', '" + revisionC + "': 'false'}," + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c'}" + + "}"; + prepareDocumentMock(jsonProperties); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + + assertTrue(documentRevisionCleanupHelper.getBlockedRevisionsToKeep().get(1).contains(revisionA)); + assertEquals(Set.of(revisionA, revisionB, revisionC), documentRevisionCleanupHelper.getCandidateRevisionsToClean().get(1)); + } + + @Test + public void testClassifyAndMapRevisionsAndPropertiesWithDeleted() throws IOException { + Revision revisionA = new Revision(111111111L, 0, 1); + Revision revisionB = new Revision(222222222L, 0, 1); + Revision revisionC = new Revision(333333333L, 0, 1); + + String jsonProperties = "{" + + "'prop1': {'" + revisionA + "': 'value1', '" + revisionB + "': 'value2', '" + revisionC + "': 'value3'}, " + + "'prop2': {'" + revisionB + "': 'value4'}, " + + "'_deleted': {'" + revisionA + "': 'false'}," + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c'}" + + "}"; + prepareDocumentMock(jsonProperties); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + + SortedMap<Revision, TreeSet<String>> propertiesModifiedByRevision = documentRevisionCleanupHelper.getPropertiesModifiedByRevision(); + assertEquals(Set.of("prop1", "_deleted"), propertiesModifiedByRevision.get(revisionA)); + assertEquals(Set.of("prop1", "prop2"), propertiesModifiedByRevision.get(revisionB)); + assertEquals(Set.of("prop1"), propertiesModifiedByRevision.get(revisionC)); + + SortedMap<String, SortedMap<Integer, TreeSet<Revision>>> revisionsModifyingProperty = documentRevisionCleanupHelper.getRevisionsModifyingProperty(); + assertEquals(Set.of(revisionA, revisionB, revisionC), revisionsModifyingProperty.get("prop1").get(1)); + assertEquals(Set.of(revisionB), revisionsModifyingProperty.get("prop2").get(1)); + assertEquals(Set.of(revisionA), revisionsModifyingProperty.get("_deleted").get(1)); + } + + @Test + public void testClassifyAndMapRevisionsMultipleDeleted() throws IOException { + Revision revisionA = new Revision(111111111L, 0, 1); + Revision revisionB = new Revision(222222222L, 0, 1); + Revision revisionC = new Revision(333333333L, 0, 1); + + String jsonProperties = "{" + + "'_deleted': {'" + revisionA + "': 'false', '" + revisionB + "': 'true', '" + revisionC + "': 'false'}," + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c'}" + + "}"; + prepareDocumentMock(jsonProperties); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + + SortedMap<Revision, TreeSet<String>> propertiesModifiedByRevision = documentRevisionCleanupHelper.getPropertiesModifiedByRevision(); + assertEquals(Set.of("_deleted"), propertiesModifiedByRevision.get(revisionA)); + assertEquals(Set.of("_deleted"), propertiesModifiedByRevision.get(revisionB)); + assertEquals(Set.of("_deleted"), propertiesModifiedByRevision.get(revisionC)); + + SortedMap<String, SortedMap<Integer, TreeSet<Revision>>> revisionsModifyingProperty = documentRevisionCleanupHelper.getRevisionsModifyingProperty(); + assertEquals(Set.of(revisionA, revisionB, revisionC), revisionsModifyingProperty.get("_deleted").get(1)); + } + + @Test + public void testClassifyAndMapRevisionsAndPropertiesWithoutDeleted() throws IOException { + Revision revisionA = new Revision(111111111L, 0, 1); + Revision revisionB = new Revision(222222222L, 0, 1); + Revision revisionC = new Revision(333333333L, 0, 1); + + String jsonProperties = "{" + + "'prop1': {'" + revisionA + "': 'value1', '" + revisionB + "': 'value2', '" + revisionC + "': 'value3'}, " + + "'prop2': {'" + revisionB + "': 'value4'}, " + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c'}" + + "}"; + prepareDocumentMock(jsonProperties); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + + SortedMap<Revision, TreeSet<String>> propertiesModifiedByRevision = documentRevisionCleanupHelper.getPropertiesModifiedByRevision(); + assertEquals(Set.of("prop1"), propertiesModifiedByRevision.get(revisionA)); + assertEquals(Set.of("prop1", "prop2"), propertiesModifiedByRevision.get(revisionB)); + assertEquals(Set.of("prop1"), propertiesModifiedByRevision.get(revisionC)); + + SortedMap<String, SortedMap<Integer, TreeSet<Revision>>> revisionsModifyingProperty = documentRevisionCleanupHelper.getRevisionsModifyingProperty(); + assertEquals(Set.of(revisionA, revisionB, revisionC), revisionsModifyingProperty.get("prop1").get(1)); + assertEquals(Set.of(revisionB), revisionsModifyingProperty.get("prop2").get(1)); + } + + @Test + public void testClassifyAndMapRevisionsAndPropertiesNotCommitted() throws IOException { + Revision revisionA = new Revision(111111111L, 0, 1); + Revision revisionB = new Revision(222222222L, 0, 1); + Revision revisionC = new Revision(333333333L, 0, 1); + Revision revisionD = new Revision(444444444L, 0, 1); + + String jsonProperties = "{" + + "'prop1': {'" + revisionA + "': 'value1', '" + revisionB + "': 'value2', '" + revisionC + "': 'value3', '" + revisionD + "': 'value5'}, " + + "'prop2': {'" + revisionB + "': 'value4', '" + revisionD + "': 'value5'}, " + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c', '" + revisionD + "': 'nc'}" + + "}"; + prepareDocumentMock(jsonProperties); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + + // Modifications done in revisionD should be ignored, as it is not committed + SortedMap<Revision, TreeSet<String>> propertiesModifiedByRevision = documentRevisionCleanupHelper.getPropertiesModifiedByRevision(); + assertEquals(Set.of("prop1"), propertiesModifiedByRevision.get(revisionA)); + assertEquals(Set.of("prop1", "prop2"), propertiesModifiedByRevision.get(revisionB)); + assertEquals(Set.of("prop1"), propertiesModifiedByRevision.get(revisionC)); + assertNull(propertiesModifiedByRevision.get(revisionD)); + + SortedMap<String, SortedMap<Integer, TreeSet<Revision>>> revisionsModifyingProperty = documentRevisionCleanupHelper.getRevisionsModifyingProperty(); + assertEquals(Set.of(revisionA, revisionB, revisionC), revisionsModifyingProperty.get("prop1").get(1)); + assertEquals(Set.of(revisionB), revisionsModifyingProperty.get("prop2").get(1)); + } + + @Test + public void testClassifyAndMapRevisionsAndPropertiesDifferentClusters() throws IOException { + Revision revisionA = new Revision(111111111L, 0, 1); + Revision revisionB = new Revision(222222222L, 0, 2); + Revision revisionC = new Revision(333333333L, 0, 3); + Revision revisionD = new Revision(444444444L, 0, 1); + + String jsonProperties = "{" + + "'prop1': {'" + revisionA + "': 'value1', '" + revisionB + "': 'value2', '" + revisionC + "': 'value3', '" + revisionD + "': 'value5'}, " + + "'prop2': {'" + revisionB + "': 'value4', '" + revisionD + "': 'value5'}, " + + "'_revisions': {'" + revisionA + "': 'c', '" + revisionB + "': 'c', '" + revisionC + "': 'c', '" + revisionD + "': 'c'}" + + "}"; + prepareDocumentMock(jsonProperties); + + documentRevisionCleanupHelper.classifyAndMapRevisionsAndProperties(); + + SortedMap<Revision, TreeSet<String>> propertiesModifiedByRevision = documentRevisionCleanupHelper.getPropertiesModifiedByRevision(); + assertEquals(Set.of("prop1"), propertiesModifiedByRevision.get(revisionA)); + assertEquals(Set.of("prop1", "prop2"), propertiesModifiedByRevision.get(revisionB)); + assertEquals(Set.of("prop1"), propertiesModifiedByRevision.get(revisionC)); + assertEquals(Set.of("prop1", "prop2"), propertiesModifiedByRevision.get(revisionD)); + + SortedMap<String, SortedMap<Integer, TreeSet<Revision>>> revisionsModifyingProperty = documentRevisionCleanupHelper.getRevisionsModifyingProperty(); + assertEquals(Set.of(revisionA, revisionD), revisionsModifyingProperty.get("prop1").get(1)); + assertEquals(Set.of(revisionB), revisionsModifyingProperty.get("prop1").get(2)); + assertEquals(Set.of(revisionC), revisionsModifyingProperty.get("prop1").get(3)); + assertEquals(Set.of(revisionD), revisionsModifyingProperty.get("prop2").get(1)); + assertEquals(Set.of(revisionB), revisionsModifyingProperty.get("prop2").get(2)); + assertNull(revisionsModifyingProperty.get("prop2").get(3)); + } + + private void prepareDocumentMock(String jsonProperties) throws IOException { + String json = jsonProperties.replace("'", "\""); + + ObjectMapper objectMapper = new ObjectMapper(); + Map<String, Map<String, String>> data = objectMapper.readValue(json, new TypeReference<>() {}); + + SortedMap<String, Object> entries = new TreeMap<>(); + SortedMap<Revision, String> allRevisions = new TreeMap<>(StableRevisionComparator.INSTANCE); + for (Map.Entry<String, Map<String, String>> entry : data.entrySet()) { + String property = entry.getKey(); + Map<String, String> revisions = entry.getValue(); + + SortedMap<Revision, String> sortedRevisions = new TreeMap<>(StableRevisionComparator.INSTANCE); + for (Map.Entry<String, String> revisionEntry : revisions.entrySet()) { + String revisionStr = revisionEntry.getKey(); + String value = revisionEntry.getValue(); + + String[] parts = revisionStr.split("-"); + // The timestamp part of the revision string (first part) is parsed as hexadecimal (radix 16) + long timestamp = Long.parseLong(parts[0].substring(1), 16); + int counter = Integer.parseInt(parts[1]); + int clusterId = Integer.parseInt(parts[2]); + + Revision revision = new Revision(timestamp, counter, clusterId); + sortedRevisions.put(revision, value); + + // Add all revisions to the "_revisions" list + allRevisions.put(revision, value); + } + entries.put(property, sortedRevisions); + } + + Mockito.when(workingDocument.entrySet()).thenReturn(entries.entrySet()); + Mockito.when(workingDocument.get("_deleted")).thenReturn(entries.get("_deleted")); + Mockito.when(workingDocument.get("_revisions")).thenReturn(allRevisions); + } + + private void prepareCheckpointsMock(String jsonCheckpoints) throws IOException { + String json = jsonCheckpoints.replace("'", "\""); + + ObjectMapper objectMapper = new ObjectMapper(); + Map<String, Map<String, Object>> data = objectMapper.readValue(json, new TypeReference<>() {}); + + SortedMap<Revision, Checkpoints.Info> checkpoints = new TreeMap<>(StableRevisionComparator.REVERSE); + for (Map.Entry<String, Map<String, Object>> entry : data.entrySet()) { + String checkpointStr = entry.getKey(); + Map<String, Object> checkpointData = entry.getValue(); + + String[] parts = checkpointStr.split("-"); + long timestamp = Long.parseLong(parts[0].substring(1), 16); + int counter = Integer.parseInt(parts[1]); + int clusterId = Integer.parseInt(parts[2]); + + Revision checkpoint = new Revision(timestamp, counter, clusterId); + String checkpointDataJson = objectMapper.writeValueAsString(checkpointData); + + Checkpoints.Info info = Checkpoints.Info.fromString(checkpointDataJson); + checkpoints.put(checkpoint, info); + } + + Mockito.when(documentNodeStore.getCheckpoints()).thenReturn(this.checkpoints); + Mockito.when(documentNodeStore.getCheckpoints().getCheckpoints()).thenReturn(checkpoints); + + /*Document cdoc = SETTINGS.newDocument(documentStore); + Mockito.when(documentNodeStore.getCheckpoints()).thenReturn(this.checkpoints); + Mockito.when(documentStore.find(Collection.SETTINGS, "checkpoint", 0)).thenReturn(cdoc);*/ + } +} \ No newline at end of file