ibessonov commented on code in PR #7880:
URL: https://github.com/apache/ignite-3/pull/7880#discussion_r3020077046


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -71,11 +86,66 @@ class RaftLogGarbageCollector {
 
     private final IndexFileManager indexFileManager;
 
+    private final long softLimitBytes;
+
+    private final SegmentFileCompactionStrategy strategy;
+
+    private final FailureProcessor failureProcessor;
+
     private final AtomicLong logSizeBytes = new AtomicLong();
 
-    RaftLogGarbageCollector(Path segmentFilesDir, IndexFileManager 
indexFileManager) {
+    private final Thread gcThread;
+
+    RaftLogGarbageCollector(
+            String nodeName,
+            Path segmentFilesDir,
+            IndexFileManager indexFileManager,
+            long softLimitBytes,
+            SegmentFileCompactionStrategy strategy,
+            FailureProcessor failureProcessor
+    ) {
         this.segmentFilesDir = segmentFilesDir;
         this.indexFileManager = indexFileManager;
+        this.softLimitBytes = softLimitBytes;
+        this.strategy = strategy;
+        this.failureProcessor = failureProcessor;
+
+        gcThread = new IgniteThread(nodeName, "segstore-gc", new GcTask());
+    }
+
+    void start() throws IOException {
+        initLogSizeFromDisk();
+
+        gcThread.start();
+    }
+
+    void stop() {
+        gcThread.interrupt();

Review Comment:
   When exactly do we stop this component? Are you sure that you won't 
interrupt any file channel that's still in use?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -137,29 +207,43 @@ void cleanupLeftoverFiles() throws IOException {
     }
 
     @VisibleForTesting
-    void runCompaction(SegmentFile segmentFile) throws IOException {
-        LOG.info("Compacting segment file [path = {}].", segmentFile.path());
+    long logSizeBytes() {
+        return logSizeBytes.get();
+    }
+
+    @VisibleForTesting
+    void runCompaction(FileProperties segmentFileProperties) throws 
IOException {
+        Path segmentFilePath = 
segmentFilesDir.resolve(SegmentFile.fileName(segmentFileProperties));
+
+        LOG.info("Compacting segment file [path = {}].", segmentFilePath);
 
+        // TODO: Skip non-compactible files, see 
https://issues.apache.org/jira/browse/IGNITE-28417.
         Long2ObjectMap<IndexFileMeta> segmentFileDescription
-                = 
indexFileManager.describeSegmentFile(segmentFile.fileProperties().ordinal());
+                = 
indexFileManager.describeSegmentFile(segmentFileProperties.ordinal());
 
         boolean canRemoveSegmentFile = segmentFileDescription.isEmpty();
 
-        Path indexFilePath = 
indexFileManager.indexFilePath(segmentFile.fileProperties());
+        Path indexFilePath = 
indexFileManager.indexFilePath(segmentFileProperties);
 
         long logSizeDelta;
 
         if (canRemoveSegmentFile) {
-            indexFileManager.onIndexFileRemoved(segmentFile.fileProperties());
+            indexFileManager.onIndexFileRemoved(segmentFileProperties);
 
-            logSizeDelta = Files.size(segmentFile.path()) + 
Files.size(indexFilePath);
+            logSizeDelta = Files.size(segmentFilePath) + 
Files.size(indexFilePath);
         } else {
-            logSizeDelta = compactSegmentFile(segmentFile, indexFilePath, 
segmentFileDescription);
+            SegmentFile segmentFile = 
SegmentFile.openExisting(segmentFilePath, false);
+
+            try {
+                logSizeDelta = compactSegmentFile(segmentFile, indexFilePath, 
segmentFileDescription);
+            } finally {
+                segmentFile.close();
+            }
         }
 
         // Remove the previous generation of the segment file and its index. 
This is safe to do, because we rely on the file system
         // guarantees that other threads reading from the segment file will 
still be able to do that even if the file is deleted.
-        Files.delete(segmentFile.path());
+        Files.delete(segmentFilePath);

Review Comment:
   > because we rely on the file system guarantees that other threads reading 
from the segment file will still be able to do that even if the file is deleted.
   
   That's not true in general. NTFS would not do that, as far as I know



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -504,6 +505,21 @@ private static byte[] payload(SegmentInfo segmentInfo) {
         return payloadBuffer.array();
     }
 
+    /**
+     * Computes the size in bytes that the index file for the given {@code 
indexMemTable} will occupy on disk.
+     */
+    static long computeIndexFileSize(ReadModeIndexMemTable indexMemTable) {
+        long total = headerSize(indexMemTable.numGroups());
+
+        Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
+
+        while (it.hasNext()) {
+            total += payloadSize(it.next().getValue());
+        }
+
+        return total;

Review Comment:
   Can we make `ReadModeIndexMemTable` implement `Iterable<Entry<Long, 
SegmentInfo>>`? This way you would just use `foreach`



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -286,6 +370,69 @@ private static boolean isLogIndexInRange(long index, 
IndexFileMeta indexFileMeta
         return index >= indexFileMeta.firstLogIndexInclusive() && index < 
indexFileMeta.lastLogIndexExclusive();
     }
 
+    private void initLogSizeFromDisk() throws IOException {
+        Path indexFilesDir = indexFileManager.indexFilesDir();
+
+        try (Stream<Path> files = Stream.concat(Files.list(segmentFilesDir), 
Files.list(indexFilesDir))) {
+            long logSizeOnDisk = files
+                    .mapToLong(path -> {
+                        try {
+                            return Files.size(path);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    })
+                    .sum();
+
+            logSizeBytes.addAndGet(logSizeOnDisk);
+        }
+    }
+
+    private class GcTask implements Runnable {
+        @Override
+        public void run() {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    runGcCycle();
+                } catch (ClosedByInterruptException e) {
+                    return;
+                } catch (IOException e) {
+                    failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, e));
+                }
+
+                LockSupport.park();
+            }
+        }
+
+        private void runGcCycle() throws IOException {
+            if (logSizeBytes.get() < softLimitBytes) {
+                return;
+            }
+
+            LOG.info("Starting Log Storage GC cycle.");
+
+            try (Stream<FileProperties> candidates = 
strategy.selectCandidates()) {
+                Iterator<FileProperties> it = candidates.iterator();
+
+                do {
+                    if (!it.hasNext()) {
+                        LOG.warn(
+                                "Log size is above the soft limit but there 
are no files to compact "
+                                        + "[current log size = {} bytes, soft 
limit = {} bytes].",

Review Comment:
   Please follow the guidelines, names here should include no spaces



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java:
##########
@@ -62,14 +63,18 @@ class RaftLogCheckpointer {
 
     private final FailureProcessor failureProcessor;
 
+    private final LongConsumer beforeIndexFileCreated;
+
     RaftLogCheckpointer(
             String nodeName,
             IndexFileManager indexFileManager,
             FailureProcessor failureProcessor,
-            int maxQueueSize
+            int maxQueueSize,
+            LongConsumer beforeIndexFileCreated

Review Comment:
   Were there any other ways of doing it? Currently I don't even understand the 
purpose of this closure, such patterns make code more difficult in my opinion. 
Maybe we should use explicit interface instead of a generic `LongConsumer`, 
what do you think?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+/**
+ * Compaction strategy that prioritizes files with the most dead entries, 
fully-deletable files first.
+ */
+class MostGarbageFirstCompactionStrategy implements 
SegmentFileCompactionStrategy {
+    private final Path segmentFilesDir;
+
+    private final IndexFileManager indexFileManager;
+
+    MostGarbageFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager 
indexFileManager) {
+        this.segmentFilesDir = segmentFilesDir;
+        this.indexFileManager = indexFileManager;
+    }
+
+    @Override
+    public Stream<FileProperties> selectCandidates() throws IOException {
+        var scores = new Object2LongOpenHashMap<FileProperties>();
+
+        Comparator<FileProperties> comparator =
+                Comparator.<FileProperties>comparingLong(props -> 
scores.computeIfAbsent(props, this::score))
+                        .thenComparing(Comparator.naturalOrder());
+
+        return Files.list(segmentFilesDir)
+                .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+                .map(SegmentFile::fileProperties)
+                .filter(props -> 
Files.exists(indexFileManager.indexFilePath(props)))
+                .sorted(comparator);
+    }
+
+    private long score(FileProperties props) {
+        Long2ObjectMap<IndexFileMeta> description = 
indexFileManager.describeSegmentFile(props.ordinal());
+
+        if (description.isEmpty()) {
+            return -1; // fully deletable — highest priority

Review Comment:
   ```suggestion
               return -1; // Fully deletable — highest priority.
   ```



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -286,6 +370,69 @@ private static boolean isLogIndexInRange(long index, 
IndexFileMeta indexFileMeta
         return index >= indexFileMeta.firstLogIndexInclusive() && index < 
indexFileMeta.lastLogIndexExclusive();
     }
 
+    private void initLogSizeFromDisk() throws IOException {
+        Path indexFilesDir = indexFileManager.indexFilesDir();
+
+        try (Stream<Path> files = Stream.concat(Files.list(segmentFilesDir), 
Files.list(indexFilesDir))) {
+            long logSizeOnDisk = files
+                    .mapToLong(path -> {
+                        try {
+                            return Files.size(path);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    })
+                    .sum();
+
+            logSizeBytes.addAndGet(logSizeOnDisk);
+        }
+    }
+
+    private class GcTask implements Runnable {
+        @Override
+        public void run() {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    runGcCycle();
+                } catch (ClosedByInterruptException e) {
+                    return;
+                } catch (IOException e) {
+                    failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, e));
+                }
+
+                LockSupport.park();
+            }
+        }
+
+        private void runGcCycle() throws IOException {
+            if (logSizeBytes.get() < softLimitBytes) {
+                return;
+            }
+
+            LOG.info("Starting Log Storage GC cycle.");

Review Comment:
   I would also include log storage name. I suspect that metastorage, CMG and 
diztribution zones will have their own independent threads. Well, maybe we 
should identify them by a thread name, not by a message, could you please make 
sure that they're differentiable? Thank you!



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -286,6 +370,69 @@ private static boolean isLogIndexInRange(long index, 
IndexFileMeta indexFileMeta
         return index >= indexFileMeta.firstLogIndexInclusive() && index < 
indexFileMeta.lastLogIndexExclusive();
     }
 
+    private void initLogSizeFromDisk() throws IOException {

Review Comment:
   We're sure that at this point all `tmp` files are deleted, is that correct?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -286,6 +370,69 @@ private static boolean isLogIndexInRange(long index, 
IndexFileMeta indexFileMeta
         return index >= indexFileMeta.firstLogIndexInclusive() && index < 
indexFileMeta.lastLogIndexExclusive();
     }
 
+    private void initLogSizeFromDisk() throws IOException {
+        Path indexFilesDir = indexFileManager.indexFilesDir();
+
+        try (Stream<Path> files = Stream.concat(Files.list(segmentFilesDir), 
Files.list(indexFilesDir))) {
+            long logSizeOnDisk = files
+                    .mapToLong(path -> {
+                        try {
+                            return Files.size(path);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    })
+                    .sum();
+
+            logSizeBytes.addAndGet(logSizeOnDisk);
+        }
+    }
+
+    private class GcTask implements Runnable {
+        @Override
+        public void run() {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    runGcCycle();
+                } catch (ClosedByInterruptException e) {
+                    return;
+                } catch (IOException e) {
+                    failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, e));
+                }
+
+                LockSupport.park();
+            }
+        }
+
+        private void runGcCycle() throws IOException {
+            if (logSizeBytes.get() < softLimitBytes) {
+                return;
+            }
+
+            LOG.info("Starting Log Storage GC cycle.");
+
+            try (Stream<FileProperties> candidates = 
strategy.selectCandidates()) {
+                Iterator<FileProperties> it = candidates.iterator();
+
+                do {
+                    if (!it.hasNext()) {
+                        LOG.warn(
+                                "Log size is above the soft limit but there 
are no files to compact "
+                                        + "[current log size = {} bytes, soft 
limit = {} bytes].",

Review Comment:
   ```suggestion
                                           + "[currentLogSize={}, 
softLimit={}]",
   ```



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/MostGarbageFirstCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+/**
+ * Compaction strategy that prioritizes files with the most dead entries, 
fully-deletable files first.
+ */
+class MostGarbageFirstCompactionStrategy implements 
SegmentFileCompactionStrategy {
+    private final Path segmentFilesDir;
+
+    private final IndexFileManager indexFileManager;
+
+    MostGarbageFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager 
indexFileManager) {
+        this.segmentFilesDir = segmentFilesDir;
+        this.indexFileManager = indexFileManager;
+    }
+
+    @Override
+    public Stream<FileProperties> selectCandidates() throws IOException {
+        var scores = new Object2LongOpenHashMap<FileProperties>();
+
+        Comparator<FileProperties> comparator =
+                Comparator.<FileProperties>comparingLong(props -> 
scores.computeIfAbsent(props, this::score))
+                        .thenComparing(Comparator.naturalOrder());
+
+        return Files.list(segmentFilesDir)
+                .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+                .map(SegmentFile::fileProperties)
+                .filter(props -> 
Files.exists(indexFileManager.indexFilePath(props)))
+                .sorted(comparator);
+    }
+
+    private long score(FileProperties props) {
+        Long2ObjectMap<IndexFileMeta> description = 
indexFileManager.describeSegmentFile(props.ordinal());
+
+        if (description.isEmpty()) {
+            return -1; // fully deletable — highest priority

Review Comment:
   What if we replace it with something like `props.ordinal() - 
Long.MAX_VALUE`? This way the sorting will provide a natural order from lowest 
to highest ordinal for all "empty" files, ideally producing a GC process 
without any gaps in ordinals



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to