This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch OAK-10199
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit f4784f3272438ca10489d7e1e08b1faaf58dd2fd
Author: Stefan Egli <stefane...@apache.org>
AuthorDate: Thu Apr 20 18:11:08 2023 +0200

    OAK-10199 : initial sketch of detail gc skeleton
---
 .../plugins/document/VersionGCRecommendations.java |   6 +-
 .../oak/plugins/document/VersionGCSupport.java     |  25 +++
 .../plugins/document/VersionGarbageCollector.java  | 183 ++++++++++++++++++++-
 .../oak/plugins/document/DetailGCHelper.java       |  42 +++++
 .../oak/plugins/document/VersionGCTest.java        |  18 ++
 5 files changed, 272 insertions(+), 2 deletions(-)

diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCRecommendations.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCRecommendations.java
index 363c65789b..ac47cc69d8 100644
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCRecommendations.java
+++ 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCRecommendations.java
@@ -46,6 +46,7 @@ public class VersionGCRecommendations {
     final long maxCollect;
     final long deleteCandidateCount;
     final long lastOldestTimestamp;
+    final long fullDetailGCTimestamp;
     final long originalCollectLimit;
 
     private final long precisionMs;
@@ -101,6 +102,8 @@ public class VersionGCRecommendations {
         TimeInterval scope = new TimeInterval(oldestPossible, Long.MAX_VALUE);
         scope = scope.notLaterThan(keep.fromMs);
 
+        fullDetailGCTimestamp = 
settings.get(VersionGarbageCollector.SETTINGS_COLLECTION_FULL_DETAILGC_TIMESTAMP_PROP);
+
         suggestedIntervalMs = 
settings.get(VersionGarbageCollector.SETTINGS_COLLECTION_REC_INTERVAL_PROP);
         if (suggestedIntervalMs > 0) {
             suggestedIntervalMs = Math.max(suggestedIntervalMs, 
options.precisionMs);
@@ -217,6 +220,7 @@ public class VersionGCRecommendations {
         // default values
         
settings.put(VersionGarbageCollector.SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP, 
0L);
         
settings.put(VersionGarbageCollector.SETTINGS_COLLECTION_REC_INTERVAL_PROP, 0L);
+        
settings.put(VersionGarbageCollector.SETTINGS_COLLECTION_FULL_DETAILGC_TIMESTAMP_PROP,
 -1L);
         if (versionGCDoc != null) {
             for (String k : versionGCDoc.keySet()) {
                 Object value = versionGCDoc.get(k);
@@ -228,7 +232,7 @@ public class VersionGCRecommendations {
         return settings;
     }
 
-    private void setLongSetting(String propName, long val) {
+    void setLongSetting(String propName, long val) {
         UpdateOp updateOp = new 
UpdateOp(VersionGarbageCollector.SETTINGS_COLLECTION_ID, true);
         updateOp.set(propName, val);
         vgc.getDocumentStore().createOrUpdate(Collection.SETTINGS, updateOp);
diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java
index 13171b7fd5..0e5c26c83d 100644
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java
+++ 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java
@@ -83,6 +83,31 @@ public class VersionGCSupport {
         });
     }
 
+    /**
+     * TODO: document me!
+     */
+    public Iterable<NodeDocument> getModifiedDocs(final long fromModified, 
final long toModified) {
+        return filter(getSelectedDocuments(store, 
NodeDocument.MODIFIED_IN_SECS, fromModified), new Predicate<NodeDocument>() {
+            @Override
+            public boolean apply(NodeDocument input) {
+                return modifiedGreaterThanEquals(input, fromModified)
+                        && modifiedLessThan(input, toModified);
+            }
+
+            private boolean modifiedGreaterThanEquals(NodeDocument doc,
+                                                      long time) {
+                Long modified = doc.getModified();
+                return modified != null && 
modified.compareTo(getModifiedInSecs(time)) >= 0;
+            }
+
+            private boolean modifiedLessThan(NodeDocument doc,
+                                             long time) {
+                Long modified = doc.getModified();
+                return modified != null && 
modified.compareTo(getModifiedInSecs(time)) < 0;
+            }
+        });
+    }
+
     /**
      * Returns the underlying document store.
      *
diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
index ed0b333d44..e7442a7d15 100644
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
+++ 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
@@ -70,6 +70,9 @@ import static org.slf4j.helpers.MessageFormatter.arrayFormat;
 
 public class VersionGarbageCollector {
 
+    /** TODO temporary global flag to enable 'detail gc' during prototyping. 
Should eventually become eg a system property */
+    public static boolean DETAIL_GC_ENABLED = false;
+
     //Kept less than MongoDocumentStore.IN_CLAUSE_BATCH_SIZE to avoid 
re-partitioning
     private static final int DELETE_BATCH_SIZE = 450;
     private static final int UPDATE_BATCH_SIZE = 450;
@@ -99,6 +102,17 @@ public class VersionGarbageCollector {
      */
     static final String SETTINGS_COLLECTION_REC_INTERVAL_PROP = 
"recommendedIntervalMs";
 
+    /**
+     * Property name to timestamp when last full-detail-GC run happened, or -1 
if not applicable/in-use.
+     * <p>
+     * <ul>
+     * <li>-1 : full repo scan is disabled</li>
+     * <li>0 : full repo scan is enabled and bound to start from zero == 
oldest _modified </li>
+     * <li>gt 0 : full repo scan is enabled, was already done up until this 
value</li>
+     * </ul>
+     */
+    static final String SETTINGS_COLLECTION_FULL_DETAILGC_TIMESTAMP_PROP = 
"fullDetailGCTimeStamp";
+
     private final DocumentNodeStore nodeStore;
     private final DocumentStore ds;
     private final VersionGCSupport versionStore;
@@ -260,13 +274,14 @@ public class VersionGarbageCollector {
         final Stopwatch active = Stopwatch.createUnstarted();
         final Stopwatch collectDeletedDocs = Stopwatch.createUnstarted();
         final Stopwatch checkDeletedDocs = Stopwatch.createUnstarted();
+        final Stopwatch detailGcDocs = Stopwatch.createUnstarted();
         final Stopwatch deleteDeletedDocs = Stopwatch.createUnstarted();
         final Stopwatch collectAndDeleteSplitDocs = 
Stopwatch.createUnstarted();
         final Stopwatch deleteSplitDocs = Stopwatch.createUnstarted();
         final Stopwatch sortDocIds = Stopwatch.createUnstarted();
         final Stopwatch updateResurrectedDocuments = 
Stopwatch.createUnstarted();
         long activeElapsed, collectDeletedDocsElapsed, 
checkDeletedDocsElapsed, deleteDeletedDocsElapsed, 
collectAndDeleteSplitDocsElapsed,
-                deleteSplitDocsElapsed, sortDocIdsElapsed, 
updateResurrectedDocumentsElapsed;
+                deleteSplitDocsElapsed, sortDocIdsElapsed, 
updateResurrectedDocumentsElapsed, detailGcDocsElapsed;
 
         @Override
         public String toString() {
@@ -335,6 +350,7 @@ public class VersionGarbageCollector {
                 this.deleteSplitDocsElapsed += run.deleteSplitDocsElapsed;
                 this.sortDocIdsElapsed += run.sortDocIdsElapsed;
                 this.updateResurrectedDocumentsElapsed += 
run.updateResurrectedDocumentsElapsed;
+                this.detailGcDocsElapsed += run.detailGcDocsElapsed;
             } else {
                 // single run -> read from stop watches
                 this.activeElapsed += run.active.elapsed(MICROSECONDS);
@@ -345,6 +361,7 @@ public class VersionGarbageCollector {
                 this.deleteSplitDocsElapsed += 
run.deleteSplitDocs.elapsed(MICROSECONDS);
                 this.sortDocIdsElapsed += run.sortDocIds.elapsed(MICROSECONDS);
                 this.updateResurrectedDocumentsElapsed += 
run.updateResurrectedDocuments.elapsed(MICROSECONDS);
+                this.detailGcDocsElapsed += 
run.detailGcDocs.elapsed(MICROSECONDS);
             }
         }
     }
@@ -353,6 +370,7 @@ public class VersionGarbageCollector {
         NONE,
         COLLECTING,
         CHECKING,
+        DETAILGC,
         DELETING,
         SORTING,
         SPLITS_CLEANUP,
@@ -380,6 +398,7 @@ public class VersionGarbageCollector {
             this.watches.put(GCPhase.NONE, Stopwatch.createStarted());
             this.watches.put(GCPhase.COLLECTING, stats.collectDeletedDocs);
             this.watches.put(GCPhase.CHECKING, stats.checkDeletedDocs);
+            this.watches.put(GCPhase.DETAILGC, stats.detailGcDocs);
             this.watches.put(GCPhase.DELETING, stats.deleteDeletedDocs);
             this.watches.put(GCPhase.SORTING, stats.sortDocIds);
             this.watches.put(GCPhase.SPLITS_CLEANUP, 
stats.collectAndDeleteSplitDocs);
@@ -506,6 +525,7 @@ public class VersionGarbageCollector {
 
                     collectDeletedDocuments(phases, headRevision, rec);
                     collectSplitDocuments(phases, sweepRevisions, rec);
+                    collectDetailGarbage(phases, headRevision, rec);
                 }
             } catch (LimitExceededException ex) {
                 stats.limitExceeded = true;
@@ -521,6 +541,112 @@ public class VersionGarbageCollector {
             return stats;
         }
 
+        /**
+         * "Detail garbage" refers to additional garbage identified as part of 
OAK-10199
+         * et al: essentially garbage that in earlier versions of Oak were 
ignored. This
+         * includes: deleted properties, revision information within 
documents, branch
+         * commit related garbage.
+         * <p/>
+         * TODO: limit this to run only on a singleton instance, eg the 
cluster leader
+         * <p/>
+         * The "detail garbage" collector can be instructed to do a full 
repository scan
+         * - or incrementally based on where it last left off. When doing a 
full
+         * repository scan (but not limited to that), it executes in (small) 
batches
+         * followed by voluntary paused (aka throttling) to avoid excessive 
load on the
+         * system. The full repository scan does not have to finish 
particularly fast,
+         * it is okay that it takes a considerable amount of time.
+         * 
+         * @param headRevision
+         * @throws IOException
+         * @throws LimitExceededException
+         */
+        private void collectDetailGarbage(GCPhases phases, RevisionVector 
headRevision, VersionGCRecommendations rec)
+                throws IOException, LimitExceededException {
+            if (!DETAIL_GC_ENABLED) {
+                // TODO: this toggling should be done nicer asap
+                return;
+            }
+            int docsTraversed = 0;
+            DetailGC gc = new DetailGC(headRevision, monitor);
+            try {
+                final long fromModified;
+                final long toModified;
+                if (rec.fullDetailGCTimestamp == -1) {
+                    // then full detail-gc is disabled or over - use regular 
scope then
+                    fromModified = rec.scope.fromMs;
+                    toModified = rec.scope.toMs;
+                } else {
+                    // then full detail-gc is enabled - use it then
+                    fromModified = rec.fullDetailGCTimestamp; // TODO: once 
we're passed rec.scope.fromMs we should
+                                                              // disable fullgc
+                    toModified = rec.scope.toMs; // the 'to' here is the max. 
it will process only eg 1 batch
+                }
+                long oldestGced = fromModified;
+                boolean foundAnything = false;
+                if (phases.start(GCPhase.COLLECTING)) {
+                    Iterable<NodeDocument> itr = 
versionStore.getModifiedDocs(fromModified, toModified);
+                    final Stopwatch timer = Stopwatch.createUnstarted();
+                    timer.reset().start();
+                    try {
+                        for (NodeDocument doc : itr) {
+                            // continue with GC?
+                            if (cancel.get()) {
+                                break;
+                            }
+                            foundAnything = true;
+                            if (phases.start(GCPhase.DETAILGC)) {
+                                gc.detailGC(doc, phases);
+                                phases.stop(GCPhase.DETAILGC);
+                            }
+                            final Long modified = doc.getModified();
+                            if (modified == null) {
+                                monitor.warn("collectDetailGarbage : document 
has no _modified property : {}",
+                                        doc.getId());
+                            } else if (modified < oldestGced) {
+                                monitor.warn(
+                                        "collectDetailGarbage : document has 
older _modified than query boundary : {} (from: {}, to: {})",
+                                        modified, fromModified, toModified);
+                            } else {
+                                oldestGced = modified;
+                            }
+                            docsTraversed++;
+                            if (docsTraversed % PROGRESS_BATCH_SIZE == 0) {
+                                monitor.info("Iterated through {} documents so 
far. {} had detail garbage",
+                                        docsTraversed, gc.getNumDocuments());
+                            }
+                            if (rec.maxCollect > 0 && gc.getNumDocuments() > 
rec.maxCollect) {
+                                // TODO: how would we recover from this?
+                                throw new LimitExceededException();
+                            }
+                        }
+                    } finally {
+                        Utils.closeIfCloseable(itr);
+                        
delayOnModifications(timer.stop().elapsed(TimeUnit.MILLISECONDS));
+                    }
+                    phases.stop(GCPhase.COLLECTING);
+                    if (!cancel.get() && foundAnything) {
+                        // TODO: move to evaluate()
+                        
rec.setLongSetting(SETTINGS_COLLECTION_FULL_DETAILGC_TIMESTAMP_PROP, oldestGced 
+ 1);
+                    }
+                }
+            } finally {
+                gc.close();
+            }
+        }
+
+        private void delayOnModifications(long durationMs) {
+            long delayMs = Math.round(durationMs * options.delayFactor);
+            if (!cancel.get() && delayMs > 0) {
+                try {
+                    Clock clock = nodeStore.getClock();
+                    clock.waitUntil(clock.getTime() + delayMs);
+                }
+                catch (InterruptedException ex) {
+                    /* ignore */
+                }
+            }
+        }
+
         private void collectSplitDocuments(GCPhases phases,
                                            RevisionVector sweepRevisions,
                                            VersionGCRecommendations rec) {
@@ -611,6 +737,61 @@ public class VersionGarbageCollector {
         }
     }
 
+    private class DetailGC implements Closeable {
+
+        private final RevisionVector headRevision;
+        private final GCMonitor monitor;
+        private int count;
+
+        public DetailGC(@NotNull RevisionVector headRevision, @NotNull 
GCMonitor monitor) {
+            this.headRevision = checkNotNull(headRevision);
+            this.monitor = monitor;
+        }
+
+        public void detailGC(NodeDocument doc, GCPhases phases) {
+            deleteSample(doc, phases);
+            deleteUnmergedBranchCommitDocument(doc, phases);
+            deleteDeletedProperties(doc, phases);
+            deleteOldRevisions(doc, phases);
+        }
+
+        /** TODO remove, this is just a skeleton sample */
+        private void deleteSample(NodeDocument doc, GCPhases phases) {
+            if (doc.getId().contains("should_delete")) {
+                if (phases.start(GCPhase.DELETING)) {
+                    monitor.info("deleteSample: should do the deletion now, 
but this is demo only. I'm still learning");
+                    System.out.println("do the actual deletion");
+                    count++;
+                    phases.stop(GCPhase.DELETING);
+                }
+            }
+        }
+
+        private void deleteUnmergedBranchCommitDocument(NodeDocument doc, 
GCPhases phases) {
+            // TODO Auto-generated method stub
+
+        }
+
+        private void deleteDeletedProperties(NodeDocument doc, GCPhases 
phases) {
+            // TODO Auto-generated method stub
+
+        }
+
+        private void deleteOldRevisions(NodeDocument doc, GCPhases phases) {
+            // TODO Auto-generated method stub
+
+        }
+
+        long getNumDocuments() {
+            return count;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+    }
+
     /**
      * A helper class to remove document for deleted nodes.
      */
diff --git 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DetailGCHelper.java
 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DetailGCHelper.java
new file mode 100644
index 0000000000..8a585c7dc0
--- /dev/null
+++ 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DetailGCHelper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+public class DetailGCHelper {
+
+    public static void setLongSetting(String propName, long val, 
DocumentNodeStore ns) {
+        UpdateOp updateOp = new 
UpdateOp(VersionGarbageCollector.SETTINGS_COLLECTION_ID, true);
+        updateOp.set(propName, val);
+        ns.getDocumentStore().createOrUpdate(Collection.SETTINGS, updateOp);
+    }
+
+    public static void enableDetailGC(DocumentNodeStore ns) {
+        VersionGarbageCollector.DETAIL_GC_ENABLED = true;
+        if (ns != null) {
+            
setLongSetting(VersionGarbageCollector.SETTINGS_COLLECTION_FULL_DETAILGC_TIMESTAMP_PROP,
 0, ns);
+        }
+    }
+
+    public static void disableDetailGC(DocumentNodeStore ns) {
+        VersionGarbageCollector.DETAIL_GC_ENABLED = false;
+        if (ns != null) {
+            
setLongSetting(VersionGarbageCollector.SETTINGS_COLLECTION_FULL_DETAILGC_TIMESTAMP_PROP,
 -1, ns);
+        }
+    }
+}
diff --git 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
index a3e7a5e1e3..1bd81ce89c 100644
--- 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
+++ 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
@@ -51,6 +51,7 @@ import static java.util.concurrent.TimeUnit.HOURS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -338,6 +339,23 @@ public class VersionGCTest {
         }
     }
 
+    // OAK-10199
+    @Test
+    public void testDetailGcDocumentRead_disabled() throws Exception {
+        DetailGCHelper.disableDetailGC(ns);
+        VersionGCStats stats = gc.gc(30, TimeUnit.MINUTES);
+        assertNotNull(stats);
+        assertEquals(0, stats.detailGcDocsElapsed);
+    }
+
+    @Test
+    public void testDetailGcDocumentRead_enabled() throws Exception {
+        DetailGCHelper.enableDetailGC(ns);
+        VersionGCStats stats = gc.gc(30, TimeUnit.MINUTES);
+        assertNotNull(stats);
+        assertNotEquals(0, stats.detailGcDocsElapsed);
+    }
+
     private Future<VersionGCStats> gc() {
         // run gc in a separate thread
         return execService.submit(new Callable<VersionGCStats>() {

Reply via email to