kfaraz commented on code in PR #18844: URL: https://github.com/apache/druid/pull/18844#discussion_r2660241828
########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.timeline.CompactionState; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * In-memory cache of compaction states used by {@link org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}. + * <p> + * This cache stores compaction states for published segments polled from the metadata store. + * It is the PRIMARY way to read compaction states in production. + * <p> + * The cache is populated during segment metadata cache sync operations and provides fast lookups + * without hitting the database. + */ +@LazySingleton +public class CompactionStateCache +{ + private static final Logger log = new Logger(CompactionStateCache.class); + + /** + * Atomically updated reference to published compaction states. + */ + private final AtomicReference<PublishedCompactionStates> publishedCompactionStates + = new AtomicReference<>(PublishedCompactionStates.EMPTY); + + private final AtomicInteger cacheMissCount = new AtomicInteger(0); + private final AtomicInteger cacheHitCount = new AtomicInteger(0); + + public boolean isEnabled() + { + // Always enabled when this implementation is bound + return true; + } + + /** + * Resets the cache with compaction states polled from the metadata store. + * Called after each successful poll in HeapMemorySegmentMetadataCache. + * + * @param fingerprintToStateMap Complete map of all active compaction state fingerprints + */ + public void resetCompactionStatesForPublishedSegments( + Map<String, CompactionState> fingerprintToStateMap + ) + { + this.publishedCompactionStates.set( + new PublishedCompactionStates(fingerprintToStateMap) + ); + log.debug("Reset compaction state cache with [%d] fingerprints", fingerprintToStateMap.size()); + } + + /** + * Retrieves a compaction state by its fingerprint. + * This is the PRIMARY method for reading compaction states. + * + * @param fingerprint The fingerprint to look up + * @return The compaction state, or Optional.empty() if not cached + */ + public Optional<CompactionState> getCompactionStateByFingerprint(String fingerprint) + { + if (fingerprint == null) { + return Optional.empty(); + } + + CompactionState state = publishedCompactionStates.get() + .fingerprintToStateMap + .get(fingerprint); + if (state != null) { + cacheHitCount.incrementAndGet(); + return Optional.of(state); + } + + cacheMissCount.incrementAndGet(); + return Optional.empty(); Review Comment: Please do this in an if-else instead, preferably with the null check inverted. ```suggestion if (state == null) { cacheMissCount.incrementAndGet(); return Optional.empty(); } else { cacheHitCount.incrementAndGet(); return Optional.of(state); } ``` ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.timeline.CompactionState; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * In-memory cache of compaction states used by {@link org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}. + * <p> + * This cache stores compaction states for published segments polled from the metadata store. + * It is the PRIMARY way to read compaction states in production. + * <p> + * The cache is populated during segment metadata cache sync operations and provides fast lookups + * without hitting the database. + */ +@LazySingleton +public class CompactionStateCache +{ + private static final Logger log = new Logger(CompactionStateCache.class); + + /** + * Atomically updated reference to published compaction states. + */ + private final AtomicReference<PublishedCompactionStates> publishedCompactionStates + = new AtomicReference<>(PublishedCompactionStates.EMPTY); + + private final AtomicInteger cacheMissCount = new AtomicInteger(0); + private final AtomicInteger cacheHitCount = new AtomicInteger(0); + + public boolean isEnabled() + { + // Always enabled when this implementation is bound + return true; + } + + /** + * Resets the cache with compaction states polled from the metadata store. + * Called after each successful poll in HeapMemorySegmentMetadataCache. + * + * @param fingerprintToStateMap Complete map of all active compaction state fingerprints + */ + public void resetCompactionStatesForPublishedSegments( + Map<String, CompactionState> fingerprintToStateMap + ) + { + this.publishedCompactionStates.set( + new PublishedCompactionStates(fingerprintToStateMap) + ); + log.debug("Reset compaction state cache with [%d] fingerprints", fingerprintToStateMap.size()); + } + + /** + * Retrieves a compaction state by its fingerprint. + * This is the PRIMARY method for reading compaction states. + * + * @param fingerprint The fingerprint to look up + * @return The compaction state, or Optional.empty() if not cached + */ + public Optional<CompactionState> getCompactionStateByFingerprint(String fingerprint) + { + if (fingerprint == null) { + return Optional.empty(); + } + + CompactionState state = publishedCompactionStates.get() + .fingerprintToStateMap + .get(fingerprint); + if (state != null) { + cacheHitCount.incrementAndGet(); + return Optional.of(state); + } + + cacheMissCount.incrementAndGet(); + return Optional.empty(); + } + + /** + * Gets the full cached map (immutable copy). + * Used by HeapMemorySegmentMetadataCache for delta sync calculations. + */ + public Map<String, CompactionState> getPublishedCompactionStateMap() + { + return publishedCompactionStates.get().fingerprintToStateMap; + } + + /** + * Clears the cache. Called when node stops being leader. + */ + public void clear() + { + publishedCompactionStates.set(PublishedCompactionStates.EMPTY); Review Comment: Should we also reset the hit and miss counts here? ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.timeline.CompactionState; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * In-memory cache of compaction states used by {@link org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}. + * <p> + * This cache stores compaction states for published segments polled from the metadata store. + * It is the PRIMARY way to read compaction states in production. + * <p> + * The cache is populated during segment metadata cache sync operations and provides fast lookups + * without hitting the database. + */ +@LazySingleton +public class CompactionStateCache +{ + private static final Logger log = new Logger(CompactionStateCache.class); + + /** + * Atomically updated reference to published compaction states. + */ + private final AtomicReference<PublishedCompactionStates> publishedCompactionStates + = new AtomicReference<>(PublishedCompactionStates.EMPTY); + + private final AtomicInteger cacheMissCount = new AtomicInteger(0); + private final AtomicInteger cacheHitCount = new AtomicInteger(0); + + public boolean isEnabled() + { + // Always enabled when this implementation is bound + return true; + } + + /** + * Resets the cache with compaction states polled from the metadata store. + * Called after each successful poll in HeapMemorySegmentMetadataCache. + * + * @param fingerprintToStateMap Complete map of all active compaction state fingerprints + */ + public void resetCompactionStatesForPublishedSegments( + Map<String, CompactionState> fingerprintToStateMap + ) + { + this.publishedCompactionStates.set( + new PublishedCompactionStates(fingerprintToStateMap) + ); + log.debug("Reset compaction state cache with [%d] fingerprints", fingerprintToStateMap.size()); + } + + /** + * Retrieves a compaction state by its fingerprint. + * This is the PRIMARY method for reading compaction states. + * + * @param fingerprint The fingerprint to look up + * @return The compaction state, or Optional.empty() if not cached + */ + public Optional<CompactionState> getCompactionStateByFingerprint(String fingerprint) + { + if (fingerprint == null) { + return Optional.empty(); + } + + CompactionState state = publishedCompactionStates.get() + .fingerprintToStateMap + .get(fingerprint); + if (state != null) { + cacheHitCount.incrementAndGet(); + return Optional.of(state); + } + + cacheMissCount.incrementAndGet(); + return Optional.empty(); + } + + /** + * Gets the full cached map (immutable copy). + * Used by HeapMemorySegmentMetadataCache for delta sync calculations. + */ + public Map<String, CompactionState> getPublishedCompactionStateMap() + { + return publishedCompactionStates.get().fingerprintToStateMap; + } + + /** + * Clears the cache. Called when node stops being leader. + */ + public void clear() + { + publishedCompactionStates.set(PublishedCompactionStates.EMPTY); + log.info("Cleared compaction state cache"); Review Comment: I think we can omit this log line assuming that `clear()` is called only on service shutdown. ########## docs/configuration/index.md: ########## @@ -389,6 +389,7 @@ These properties specify the JDBC connection and other configuration around the |`druid.metadata.storage.tables.segments`|The table to use to look for segments.|`druid_segments`| |`druid.metadata.storage.tables.rules`|The table to use to look for segment load/drop rules.|`druid_rules`| |`druid.metadata.storage.tables.config`|The table to use to look for configs.|`druid_config`| +|`druid.metadata.storage.tables.compactionStates`|The table to use to store compaction state fingerprints.|`druid_compactionStates`| Review Comment: ```suggestion |`druid.metadata.storage.tables.compactionStates`|The table to use to store compaction state payloads and fingerprints.|`druid_compactionStates`| ``` ########## server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java: ########## @@ -273,6 +276,7 @@ public void testSerde() throws Exception .addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG) .addValue(OverlordClient.class, overlordClient) .addValue(CompactionStatusTracker.class, statusTracker) + .addValue(CompactionStateManager.class, compactionStateManager) Review Comment: I think this can be removed now since `CompactSegments` doesn't use this anymore. ########## server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * In-memory implementation of {@link CompactionStateManager} that stores + * compaction state fingerprints in heap memory without requiring a database. + * <p> + * Useful for simulations and unit tests where database persistence is not needed. + * Database-specific operations (cleanup, unused marking) are no-ops in this implementation. + */ +public class HeapMemoryCompactionStateManager implements CompactionStateManager +{ + private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = new ConcurrentHashMap<>(); + private final ObjectMapper deterministicMapper; + + /** + * Creates an in-memory compaction state manager with a default deterministic mapper. + * This is a convenience constructor for tests and simulations. + */ + public HeapMemoryCompactionStateManager() + { + this(createDeterministicMapper()); + } + + /** + * Creates an in-memory compaction state manager with the provided deterministic mapper + * for fingerprint generation. + * + * @param deterministicMapper ObjectMapper configured for deterministic serialization + */ + public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper) + { + this.deterministicMapper = deterministicMapper; + } + + /** + * Creates an ObjectMapper configured for deterministic serialization. + * Used for generating consistent fingerprints. + */ + private static ObjectMapper createDeterministicMapper() + { + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); + return mapper; + } + + @Override + @SuppressWarnings("UnstableApiUsage") + public String generateCompactionStateFingerprint( + final CompactionState compactionState, + final String dataSource + ) + { + final Hasher hasher = Hashing.sha256().newHasher(); Review Comment: Instead of duplicating this logic, would it make sense to have this class extend the concrete `PersistedCompactionStateManager` and not override just this one method? Alternatively, the `generateFingerprint` could be a static method in `PersistedCompactionStateManager` which accepts datasource, state and the object mapper. ########## server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java: ########## @@ -351,40 +388,143 @@ private CompactionStatus evaluate() return inputBytesCheck; } - final List<String> reasonsForCompaction = + List<String> reasonsForCompaction = new ArrayList<>(); + CompactionStatus compactedOnceCheck = segmentsHaveBeenCompactedAtLeastOnce(); + if (!compactedOnceCheck.isComplete()) { + reasonsForCompaction.add(compactedOnceCheck.getReason()); + } + + if (compactionStateCache != null && targetFingerprint != null) { + // First try fingerprint-based evaluation (fast path) + CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream() + .map(f -> f.apply(this)) + .filter(status -> !status.isComplete()) + .findFirst().orElse(COMPLETE); + + if (!fingerprintStatus.isComplete()) { + reasonsForCompaction.add(fingerprintStatus.getReason()); + } + } + + reasonsForCompaction.addAll( CHECKS.stream() .map(f -> f.apply(this)) .filter(status -> !status.isComplete()) .map(CompactionStatus::getReason) - .collect(Collectors.toList()); + .collect(Collectors.toList()) + ); // Consider segments which have passed all checks to be compacted - final List<DataSegment> compactedSegments = unknownStateToSegments - .values() - .stream() - .flatMap(List::stream) - .collect(Collectors.toList()); + // Includes segments with correct fingerprints and segments that passed all state checks + final List<DataSegment> allCompactedSegments = new ArrayList<>(this.compactedSegments); + allCompactedSegments.addAll( + unknownStateToSegments + .values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + ); if (reasonsForCompaction.isEmpty()) { return COMPLETE; } else { return CompactionStatus.pending( - createStats(compactedSegments), + createStats(allCompactedSegments), createStats(uncompactedSegments), reasonsForCompaction.get(0) ); } } + /** + * Evaluates the fingerprints of all fingerprinted candidate segments against the expected fingerprint. + * <p> + * If all fingerprinted segments have the expected fingerprint, the check can quickly pass as COMPLETE. However, + * if any fingerprinted segment has a mismatched fingerprint, we need to investigate further by adding them to + * {@link #unknownStateToSegments} where their compaction states will be analyzed. + * </p> + */ + private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() + { + Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new HashMap<>(); + for (DataSegment segment : fingerprintedSegments) { + String fingerprint = segment.getCompactionStateFingerprint(); + if (fingerprint != null && !fingerprint.equals(targetFingerprint)) { + mismatchedFingerprintToSegmentMap + .computeIfAbsent(fingerprint, k -> new ArrayList<>()) + .add(segment); + } else if (fingerprint != null && fingerprint.equals(targetFingerprint)) { + // Segment has correct fingerprint - add to compacted segments + compactedSegments.add(segment); + } + } + + if (mismatchedFingerprintToSegmentMap.isEmpty()) { + return COMPLETE; + } + + boolean fingerprintedSegmentNeedingCompactionFound = false; + + if (compactionStateCache != null) { Review Comment: Please simplify this condition to have a return early syntax. e.g. ```java if (cache == null) { // move all mismatching segments to unknown state return CompactionStatus.pending("Segments have a mismatching fingerprint"); } // The rest of the logic goes here. Do not put in an else block to avoid unnecessary indentation ``` Just to confirm, the `compactionStateCache` can be null only when running the legacy `CompactSegments` duty on the Coordinator, right? It is never expected to be null when running supervisors on the Overlord? ########## server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java: ########## @@ -351,40 +388,143 @@ private CompactionStatus evaluate() return inputBytesCheck; } - final List<String> reasonsForCompaction = + List<String> reasonsForCompaction = new ArrayList<>(); + CompactionStatus compactedOnceCheck = segmentsHaveBeenCompactedAtLeastOnce(); + if (!compactedOnceCheck.isComplete()) { + reasonsForCompaction.add(compactedOnceCheck.getReason()); + } + + if (compactionStateCache != null && targetFingerprint != null) { + // First try fingerprint-based evaluation (fast path) + CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream() + .map(f -> f.apply(this)) + .filter(status -> !status.isComplete()) + .findFirst().orElse(COMPLETE); + + if (!fingerprintStatus.isComplete()) { + reasonsForCompaction.add(fingerprintStatus.getReason()); + } + } + + reasonsForCompaction.addAll( CHECKS.stream() .map(f -> f.apply(this)) .filter(status -> !status.isComplete()) .map(CompactionStatus::getReason) - .collect(Collectors.toList()); + .collect(Collectors.toList()) + ); // Consider segments which have passed all checks to be compacted - final List<DataSegment> compactedSegments = unknownStateToSegments - .values() - .stream() - .flatMap(List::stream) - .collect(Collectors.toList()); + // Includes segments with correct fingerprints and segments that passed all state checks + final List<DataSegment> allCompactedSegments = new ArrayList<>(this.compactedSegments); + allCompactedSegments.addAll( + unknownStateToSegments + .values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + ); if (reasonsForCompaction.isEmpty()) { return COMPLETE; } else { return CompactionStatus.pending( - createStats(compactedSegments), + createStats(allCompactedSegments), createStats(uncompactedSegments), reasonsForCompaction.get(0) ); } } + /** + * Evaluates the fingerprints of all fingerprinted candidate segments against the expected fingerprint. + * <p> + * If all fingerprinted segments have the expected fingerprint, the check can quickly pass as COMPLETE. However, + * if any fingerprinted segment has a mismatched fingerprint, we need to investigate further by adding them to + * {@link #unknownStateToSegments} where their compaction states will be analyzed. + * </p> + */ + private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() + { + Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new HashMap<>(); + for (DataSegment segment : fingerprintedSegments) { + String fingerprint = segment.getCompactionStateFingerprint(); + if (fingerprint != null && !fingerprint.equals(targetFingerprint)) { + mismatchedFingerprintToSegmentMap + .computeIfAbsent(fingerprint, k -> new ArrayList<>()) + .add(segment); + } else if (fingerprint != null && fingerprint.equals(targetFingerprint)) { + // Segment has correct fingerprint - add to compacted segments + compactedSegments.add(segment); + } Review Comment: Please simplify the if-else: ```java if (fingerprint == null) { // Should never happen since these are all fingerprintedSegments } else if (fingerprint.equals(target)) { // compacted } else { // mismatched } ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.timeline.CompactionState; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * In-memory cache of compaction states used by {@link org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}. + * <p> + * This cache stores compaction states for published segments polled from the metadata store. + * It is the PRIMARY way to read compaction states in production. + * <p> + * The cache is populated during segment metadata cache sync operations and provides fast lookups + * without hitting the database. + */ +@LazySingleton +public class CompactionStateCache +{ + private static final Logger log = new Logger(CompactionStateCache.class); + + /** + * Atomically updated reference to published compaction states. + */ + private final AtomicReference<PublishedCompactionStates> publishedCompactionStates + = new AtomicReference<>(PublishedCompactionStates.EMPTY); + + private final AtomicInteger cacheMissCount = new AtomicInteger(0); + private final AtomicInteger cacheHitCount = new AtomicInteger(0); + + public boolean isEnabled() + { + // Always enabled when this implementation is bound + return true; + } + + /** + * Resets the cache with compaction states polled from the metadata store. + * Called after each successful poll in HeapMemorySegmentMetadataCache. + * + * @param fingerprintToStateMap Complete map of all active compaction state fingerprints + */ + public void resetCompactionStatesForPublishedSegments( + Map<String, CompactionState> fingerprintToStateMap + ) + { + this.publishedCompactionStates.set( + new PublishedCompactionStates(fingerprintToStateMap) + ); + log.debug("Reset compaction state cache with [%d] fingerprints", fingerprintToStateMap.size()); + } + + /** + * Retrieves a compaction state by its fingerprint. + * This is the PRIMARY method for reading compaction states. + * + * @param fingerprint The fingerprint to look up + * @return The compaction state, or Optional.empty() if not cached + */ + public Optional<CompactionState> getCompactionStateByFingerprint(String fingerprint) + { + if (fingerprint == null) { + return Optional.empty(); + } + + CompactionState state = publishedCompactionStates.get() + .fingerprintToStateMap + .get(fingerprint); + if (state != null) { + cacheHitCount.incrementAndGet(); + return Optional.of(state); + } + + cacheMissCount.incrementAndGet(); + return Optional.empty(); + } + + /** + * Gets the full cached map (immutable copy). + * Used by HeapMemorySegmentMetadataCache for delta sync calculations. + */ + public Map<String, CompactionState> getPublishedCompactionStateMap() + { + return publishedCompactionStates.get().fingerprintToStateMap; + } + + /** + * Clears the cache. Called when node stops being leader. + */ + public void clear() + { + publishedCompactionStates.set(PublishedCompactionStates.EMPTY); + log.info("Cleared compaction state cache"); + } + + /** + * @return Summary stats for metric emission + */ + public Map<String, Integer> getStats() Review Comment: ```suggestion public Map<String, Integer> getAndResetStats() ``` ########## server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java: ########## @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.Striped; +import com.google.inject.Inject; +import org.apache.druid.error.InternalServerError; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.annotations.Deterministic; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.Update; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; + +/** + * Database-backed implementation of {@link CompactionStateManager}. + * <p> + * Manages the persistence and retrieval of {@link CompactionState} objects in the metadata storage. + * Compaction states are uniquely identified by their fingerprints, which are SHA-256 hashes of their content. A cache + * of compaction states using the fingerprints as keys is maintained in memory to optimize retrieval performance. + * </p> + * <p> + * A striped locking mechanism is used to ensure thread-safe persistence of compaction states on a per-datasource basis. + * </p> + */ +@ManageLifecycle +public class PersistedCompactionStateManager implements CompactionStateManager +{ + private static final EmittingLogger log = new EmittingLogger(PersistedCompactionStateManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final ObjectMapper deterministicMapper; + private final SQLMetadataConnector connector; + private final Striped<Lock> datasourceLocks = Striped.lock(128); + + @Inject + public PersistedCompactionStateManager( + @Nonnull MetadataStorageTablesConfig dbTables, + @Nonnull ObjectMapper jsonMapper, + @Deterministic @Nonnull ObjectMapper deterministicMapper, + @Nonnull SQLMetadataConnector connector + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.deterministicMapper = deterministicMapper; + this.connector = connector; + } + + @LifecycleStart + public void start() + { + } + + @LifecycleStop + public void stop() + { + } + + @VisibleForTesting + PersistedCompactionStateManager() + { + this.dbTables = null; + this.jsonMapper = null; + this.deterministicMapper = null; + this.connector = null; + } + + @Override + public void persistCompactionState( + final String dataSource, + final Map<String, CompactionState> fingerprintToStateMap, + final DateTime updateTime + ) + { + if (fingerprintToStateMap.isEmpty()) { + return; + } + + final Lock lock = datasourceLocks.get(dataSource); Review Comment: IIUC, we are using the lock so that the same fingerprint is not updated/inserted in the metadata store by another thread while we are working here. I think we should do the following instead: - We can remove the locking, since the persistence will happen from only a single thread anyway (via `CompactionJobQueue.runReadyJobs()`). - We can just log an error, if there are conflicts since this method should be idempotent anyway. ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +/** + * Manages compaction state persistence and fingerprint generation. + * <p> + * Implementations may be backed by a database (like {@link PersistedCompactionStateManager}) or + * use in-memory storage (like {@link HeapMemoryCompactionStateManager}). + */ +public interface CompactionStateManager +{ + /** + * Generates a deterministic fingerprint for the given compaction state and datasource. + * The fingerprint is a SHA-256 hash of the datasource name and serialized compaction state. + * + * @param compactionState The compaction configuration to fingerprint + * @param dataSource The datasource name + * @return A hex-encoded SHA-256 fingerprint string + */ + String generateCompactionStateFingerprint(CompactionState compactionState, String dataSource); + + /** + * Persists compaction states to storage. + * + * @param dataSource The datasource name + * @param fingerprintToStateMap Map of fingerprints to their compaction states + * @param updateTime The timestamp for this update + */ + void persistCompactionState( Review Comment: The only usage of this method seems to pass in a single fingerprint right now. Should we just keep that for the time being? We can add the batch upsert when needed later. ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +/** + * Manages compaction state persistence and fingerprint generation. + * <p> + * Implementations may be backed by a database (like {@link PersistedCompactionStateManager}) or + * use in-memory storage (like {@link HeapMemoryCompactionStateManager}). + */ +public interface CompactionStateManager +{ + /** + * Generates a deterministic fingerprint for the given compaction state and datasource. + * The fingerprint is a SHA-256 hash of the datasource name and serialized compaction state. + * + * @param compactionState The compaction configuration to fingerprint + * @param dataSource The datasource name + * @return A hex-encoded SHA-256 fingerprint string + */ + String generateCompactionStateFingerprint(CompactionState compactionState, String dataSource); + + /** + * Persists compaction states to storage. + * + * @param dataSource The datasource name + * @param fingerprintToStateMap Map of fingerprints to their compaction states + * @param updateTime The timestamp for this update + */ + void persistCompactionState( Review Comment: Maybe rename to `upsertCompactionState` instead. ########## server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java: ########## @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.Striped; +import com.google.inject.Inject; +import org.apache.druid.error.InternalServerError; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.annotations.Deterministic; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.Update; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; + +/** + * Database-backed implementation of {@link CompactionStateManager}. + * <p> + * Manages the persistence and retrieval of {@link CompactionState} objects in the metadata storage. + * Compaction states are uniquely identified by their fingerprints, which are SHA-256 hashes of their content. A cache + * of compaction states using the fingerprints as keys is maintained in memory to optimize retrieval performance. + * </p> + * <p> + * A striped locking mechanism is used to ensure thread-safe persistence of compaction states on a per-datasource basis. + * </p> + */ +@ManageLifecycle +public class PersistedCompactionStateManager implements CompactionStateManager +{ + private static final EmittingLogger log = new EmittingLogger(PersistedCompactionStateManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final ObjectMapper deterministicMapper; + private final SQLMetadataConnector connector; + private final Striped<Lock> datasourceLocks = Striped.lock(128); + + @Inject + public PersistedCompactionStateManager( + @Nonnull MetadataStorageTablesConfig dbTables, + @Nonnull ObjectMapper jsonMapper, + @Deterministic @Nonnull ObjectMapper deterministicMapper, + @Nonnull SQLMetadataConnector connector + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.deterministicMapper = deterministicMapper; + this.connector = connector; + } + + @LifecycleStart + public void start() + { + } + + @LifecycleStop + public void stop() Review Comment: If these methods are going to be no-op, let's remove them and bind this class as a `LazySingleton` instead of a managed one. ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +/** + * Manages compaction state persistence and fingerprint generation. + * <p> + * Implementations may be backed by a database (like {@link PersistedCompactionStateManager}) or + * use in-memory storage (like {@link HeapMemoryCompactionStateManager}). + */ +public interface CompactionStateManager +{ + /** + * Generates a deterministic fingerprint for the given compaction state and datasource. + * The fingerprint is a SHA-256 hash of the datasource name and serialized compaction state. + * + * @param compactionState The compaction configuration to fingerprint + * @param dataSource The datasource name + * @return A hex-encoded SHA-256 fingerprint string + */ + String generateCompactionStateFingerprint(CompactionState compactionState, String dataSource); + + /** + * Persists compaction states to storage. + * + * @param dataSource The datasource name + * @param fingerprintToStateMap Map of fingerprints to their compaction states + * @param updateTime The timestamp for this update + */ + void persistCompactionState( + String dataSource, + Map<String, CompactionState> fingerprintToStateMap, + DateTime updateTime + ); + + /** + * Marks compaction states as unused if they are not referenced by any used segments. + * This is used for cleanup operations. Implementations may choose to no-op this. Review Comment: Why is it okay to no-op this method and the other methods? I think it is better to require the concrete classes to provide an implementation, since the only non-test impl right now is the `PersistedCompactionStateManager`. ########## server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java: ########## @@ -1106,6 +1122,89 @@ private void emitMetric(String datasource, String metric, long value) ); } + /** + * Retrieves required used compaction states from the metadata store and resets + * them in the {@link CompactionStateCache}. If this is the first sync, all used + * compaction states are retrieved from the metadata store. If this is a delta sync, + * first only the fingerprints of all used compaction states are retrieved. Payloads are + * then fetched for only the fingerprints which are not present in the cache. + */ + private void retrieveAndResetUsedCompactionStates() + { + final Stopwatch compactionStateSyncDuration = Stopwatch.createStarted(); + + // Reset the CompactionStateCache with latest compaction states + final Map<String, CompactionState> fingerprintToStateMap; + if (syncFinishTime.get() == null) { + fingerprintToStateMap = buildFingerprintToStateMapForFullSync(); + } else { + fingerprintToStateMap = buildFingerprintToStateMapForDeltaSync(); + } + + compactionStateCache.resetCompactionStatesForPublishedSegments(fingerprintToStateMap); + + // Emit metrics for the current contents of the cache + compactionStateCache.getStats().forEach(this::emitMetric); + emitMetric(Metric.RETRIEVE_COMPACTION_STATES_DURATION_MILLIS, compactionStateSyncDuration.millisElapsed()); + } + + /** + * Retrieves all used compaction states from the metadata store and builds a + * fresh map from compaction state fingerprint to state. + */ + private Map<String, CompactionState> buildFingerprintToStateMapForFullSync() + { + final List<CompactionStateRecord> records = query( + SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStates + ); + + return records.stream().collect( + Collectors.toMap( + CompactionStateRecord::getFingerprint, + CompactionStateRecord::getState + ) + ); + } + + /** + * Retrieves compaction states from the metadata store if they are not present + * in the cache or have been recently updated in the metadata store. These + * compaction states along with those already present in the cache are used to + * build a complete updated map from compaction state fingerprint to state. + * + * @return Complete updated map from compaction state fingerprint to state for all + * used compaction states currently persisted in the metadata store. + */ + private Map<String, CompactionState> buildFingerprintToStateMapForDeltaSync() + { + // Identify fingerprints in the cache and in the metadata store + final Map<String, CompactionState> fingerprintToStateMap = new HashMap<>( + compactionStateCache.getPublishedCompactionStateMap() + ); + final Set<String> cachedFingerprints = Set.copyOf(fingerprintToStateMap.keySet()); + final Set<String> persistedFingerprints = query( + SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStateFingerprints + ); + + // Remove entry for compaction states that have been deleted from the metadata store + final Set<String> deletedFingerprints = Sets.difference(cachedFingerprints, persistedFingerprints); + deletedFingerprints.forEach(fingerprintToStateMap::remove); + + // Retrieve and add entry for compaction states that have been added to the metadata store + final Set<String> addedFingerprints = Sets.difference(persistedFingerprints, cachedFingerprints); + final List<CompactionStateRecord> addedCompactionStateRecords = query( + sql -> sql.retrieveCompactionStatesForFingerprints(addedFingerprints) + ); + if (addedCompactionStateRecords.size() < addedFingerprints.size()) { + emitMetric(Metric.SKIPPED_COMPACTION_STATES, addedFingerprints.size() - addedCompactionStateRecords.size()); Review Comment: Instead of this, should we just emit a metric for newly added fingerprints and a metric for newly deleted fingerprints? ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +/** + * Manages compaction state persistence and fingerprint generation. + * <p> + * Implementations may be backed by a database (like {@link PersistedCompactionStateManager}) or + * use in-memory storage (like {@link HeapMemoryCompactionStateManager}). + */ +public interface CompactionStateManager Review Comment: To avoid confusion with classes like `CompactionStatusTracker`, should we rename this to something like `CompactionStateStorage` and impl could be `CompactionStateSqlStorage`? ########## server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java: ########## @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.Striped; +import com.google.inject.Inject; +import org.apache.druid.error.InternalServerError; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.annotations.Deterministic; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.Update; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; + +/** + * Database-backed implementation of {@link CompactionStateManager}. + * <p> + * Manages the persistence and retrieval of {@link CompactionState} objects in the metadata storage. + * Compaction states are uniquely identified by their fingerprints, which are SHA-256 hashes of their content. A cache + * of compaction states using the fingerprints as keys is maintained in memory to optimize retrieval performance. + * </p> + * <p> + * A striped locking mechanism is used to ensure thread-safe persistence of compaction states on a per-datasource basis. + * </p> + */ +@ManageLifecycle +public class PersistedCompactionStateManager implements CompactionStateManager Review Comment: Nit: We may rename this to `SqlCompactionStateManager` to align with other similar classes that handle persistence of any metadata in the DB. ########## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java: ########## @@ -1701,6 +1705,132 @@ private SegmentSchemaRecord mapToSchemaRecord(ResultSet resultSet) } } + /** + * Retrieves all unique compaction state fingerprints currently referenced by used segments. + * This is used for delta syncs to determine which fingerprints are still active. + * + * @return Set of compaction state fingerprints + */ + public Set<String> retrieveAllUsedCompactionStateFingerprints() + { + final String sql = StringUtils.format( + "SELECT DISTINCT compaction_state_fingerprint FROM %s " + + "WHERE used = true AND compaction_state_fingerprint IS NOT NULL", + dbTables.getSegmentsTable() Review Comment: Instead of querying the segments table, please hit the compaction states table instead, since it is expected to be much smaller. As an added benefit, the compaction states table will never have fingerprint as null, so the SQL just becomes `SELECT compaction_state_fingerprint FROM druid_compactionStates`. ########## server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java: ########## @@ -785,13 +800,13 @@ private void retrieveAllUsedSegments( final String sql; if (useSchemaCache) { sql = StringUtils.format( - "SELECT id, payload, created_date, used_status_last_updated, schema_fingerprint, num_rows" + "SELECT id, payload, created_date, used_status_last_updated, compaction_state_fingerprint, schema_fingerprint, num_rows" Review Comment: Super nit: Please put the `compaction_state_fingerprint` at the end so that the existing column indexes in the result set don't need to change. ########## server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java: ########## @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.Striped; +import com.google.inject.Inject; +import org.apache.druid.error.InternalServerError; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.annotations.Deterministic; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.Update; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; + +/** + * Database-backed implementation of {@link CompactionStateManager}. + * <p> + * Manages the persistence and retrieval of {@link CompactionState} objects in the metadata storage. + * Compaction states are uniquely identified by their fingerprints, which are SHA-256 hashes of their content. A cache + * of compaction states using the fingerprints as keys is maintained in memory to optimize retrieval performance. + * </p> + * <p> + * A striped locking mechanism is used to ensure thread-safe persistence of compaction states on a per-datasource basis. + * </p> + */ +@ManageLifecycle +public class PersistedCompactionStateManager implements CompactionStateManager +{ + private static final EmittingLogger log = new EmittingLogger(PersistedCompactionStateManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final ObjectMapper deterministicMapper; + private final SQLMetadataConnector connector; + private final Striped<Lock> datasourceLocks = Striped.lock(128); + + @Inject + public PersistedCompactionStateManager( + @Nonnull MetadataStorageTablesConfig dbTables, + @Nonnull ObjectMapper jsonMapper, + @Deterministic @Nonnull ObjectMapper deterministicMapper, + @Nonnull SQLMetadataConnector connector + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.deterministicMapper = deterministicMapper; + this.connector = connector; + } + + @LifecycleStart + public void start() + { + } + + @LifecycleStop + public void stop() + { + } + + @VisibleForTesting + PersistedCompactionStateManager() Review Comment: Can we use the actual constructor instead and just pass nulls? ########## server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedCompactionState.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.metadata.CompactionStateManager; +import org.apache.druid.server.coordinator.config.MetadataCleanupConfig; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; + +import java.util.List; + +/** + * Coordinator duty that cleans up old, unused compaction state entries from the database. + * <p> + * This duty performs a three-step cleanup process: + * <ol> + * <li>Marks compaction states not referenced by any segments as unused</li> + * <li>Repairs any incorrectly marked unused states that are still referenced by used segments</li> + * <li>Deletes unused compaction states older than the configured retention period</li> + * </ol> + * <p> + * This prevents unbounded growth of the compaction states table while ensuring that + * states referenced by active segments are preserved. + */ +public class KillUnreferencedCompactionState extends MetadataCleanupDuty +{ + private static final Logger log = new Logger(KillUnreferencedCompactionState.class); + private final CompactionStateManager compactionStateManager; + + public KillUnreferencedCompactionState( + MetadataCleanupConfig config, + CompactionStateManager compactionStateManager + ) + { + super("compactionState", config, Stats.Kill.COMPACTION_STATE); + this.compactionStateManager = compactionStateManager; + } + + @Override + protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) + { + // 1: Mark unreferenced states as unused + int unused = compactionStateManager.markUnreferencedCompactionStatesAsUnused(); Review Comment: It might be more desirable in the long term to have the Overlord perform all CRUD on the compaction states rather than the Coordinator, as the read operations could use the cache and the write operations could update the cache directly, thus keeping it fresh. I think there is a segment schema duty which does the same. One way to do it now would be to convert this duty into an `OverlordDuty`. It would pave way for the future migration of other cleanup duties to the Overlord. What do you think? ########## server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java: ########## @@ -2052,8 +2064,10 @@ TestDataSource.KOALA, configBuilder().forDataSource(TestDataSource.KOALA).build( TestDataSource.WIKI, SegmentTimeline.forSegments(wikiSegments), TestDataSource.KOALA, SegmentTimeline.forSegments(koalaSegments) ), - Collections.emptyMap() - ); + Collections.emptyMap(), + compactionStateManager, + compactionStateCache + ); Review Comment: Nit: please fix the indentation here and in other places in this test. ########## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java: ########## @@ -1701,6 +1705,132 @@ private SegmentSchemaRecord mapToSchemaRecord(ResultSet resultSet) } } + /** + * Retrieves all unique compaction state fingerprints currently referenced by used segments. + * This is used for delta syncs to determine which fingerprints are still active. + * + * @return Set of compaction state fingerprints + */ + public Set<String> retrieveAllUsedCompactionStateFingerprints() + { + final String sql = StringUtils.format( + "SELECT DISTINCT compaction_state_fingerprint FROM %s " + + "WHERE used = true AND compaction_state_fingerprint IS NOT NULL", + dbTables.getSegmentsTable() + ); + + return Set.copyOf( + handle.createQuery(sql) + .setFetchSize(connector.getStreamingFetchSize()) + .mapTo(String.class) + .list() + ); + } + + /** + * Retrieves all compaction states for used segments (full sync). + * Fetches from compaction_states table where the fingerprint is referenced by used segments. + * + * @return List of CompactionStateRecord objects + */ + public List<CompactionStateRecord> retrieveAllUsedCompactionStates() + { + final String sql = StringUtils.format( + "SELECT cs.fingerprint, cs.payload FROM %s cs " + + "WHERE cs.used = true " + + "AND cs.fingerprint IN (" + + " SELECT DISTINCT compaction_state_fingerprint FROM %s " + + " WHERE used = true AND compaction_state_fingerprint IS NOT NULL" + + ")", + dbTables.getCompactionStatesTable(), + dbTables.getSegmentsTable() Review Comment: Let's just ignore the segments table and query the compaction states table directly. It is okay to retrieve some compaction states even if they are not currently referenced. The background cleanup threads will take care of the references, and subsequent sync of the cache will add/remove the compaction state as needed. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable() } } + /** + * Creates the compaction states table for storing fingerprinted compaction states + * <p> + * This table stores unique compaction states that are referenced by + * segments via fingerprints. + */ + public void createCompactionStatesTable(final String tableName) + { + createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " datasource VARCHAR(255) NOT NULL,\n" + + " fingerprint VARCHAR(255) NOT NULL,\n" + + " payload %3$s NOT NULL,\n" + + " used BOOLEAN NOT NULL,\n" + + " used_status_last_updated VARCHAR(255) NOT NULL,\n" + + " PRIMARY KEY (id),\n" + + " UNIQUE (fingerprint)\n" Review Comment: If the fingerprint is unique anyway, do we still need an `id` column? I forget why the schemas table has an `id` column either. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -355,6 +355,7 @@ public void createSegmentTable(final String tableName) columns.add("used BOOLEAN NOT NULL"); columns.add("payload %2$s NOT NULL"); columns.add("used_status_last_updated VARCHAR(255) NOT NULL"); + columns.add("compaction_state_fingerprint VARCHAR(255)"); Review Comment: Could you please also add the `upgraded_from_segment_id` column here? I think we had missed adding it, and it is present only in the `alterSegmentTable` method. ########## processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java: ########## @@ -95,4 +95,12 @@ default void exportTable( * SegmentSchema table is created only when CentralizedDatasourceSchema feature is enabled. */ void createSegmentSchemasTable(); + + /** + * CompactionStates table is centralized store for {@link org.apache.druid.timeline.CompactionState} objects. + * <p> + * N segments can refer to the same compaction state via its unique fingerprint Review Comment: ```suggestion * This table stores {@link org.apache.druid.timeline.CompactionState} objects. * <p> * Multiple segments can refer to the same compaction state via its unique fingerprint. ``` ########## server/src/main/java/org/apache/druid/metadata/segment/cache/CompactionStateRecord.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.segment.cache; + +import org.apache.druid.timeline.CompactionState; + +/** + * Represents a single record in the compaction_states table. Review Comment: ```suggestion * Represents a single record in the druid_compactionStates table. ``` ########## server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * In-memory implementation of {@link CompactionStateManager} that stores + * compaction state fingerprints in heap memory without requiring a database. + * <p> + * Useful for simulations and unit tests where database persistence is not needed. + * Database-specific operations (cleanup, unused marking) are no-ops in this implementation. + */ +public class HeapMemoryCompactionStateManager implements CompactionStateManager +{ + private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = new ConcurrentHashMap<>(); + private final ObjectMapper deterministicMapper; + + /** + * Creates an in-memory compaction state manager with a default deterministic mapper. + * This is a convenience constructor for tests and simulations. + */ + public HeapMemoryCompactionStateManager() + { + this(createDeterministicMapper()); + } + + /** + * Creates an in-memory compaction state manager with the provided deterministic mapper + * for fingerprint generation. + * + * @param deterministicMapper ObjectMapper configured for deterministic serialization + */ + public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper) + { + this.deterministicMapper = deterministicMapper; + } + + /** + * Creates an ObjectMapper configured for deterministic serialization. + * Used for generating consistent fingerprints. + */ + private static ObjectMapper createDeterministicMapper() Review Comment: Maybe make this a static utility method in `TestUtils` so that it may be used elsewhere too. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable() } } + /** + * Creates the compaction states table for storing fingerprinted compaction states + * <p> + * This table stores unique compaction states that are referenced by + * segments via fingerprints. + */ + public void createCompactionStatesTable(final String tableName) + { + createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " datasource VARCHAR(255) NOT NULL,\n" Review Comment: Other tables seem to use `dataSource` camel-cased. ```suggestion + " dataSource VARCHAR(255) NOT NULL,\n" ``` ########## server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.timeline.CompactionState; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +/** + * Manages compaction state persistence and fingerprint generation. + * <p> + * Implementations may be backed by a database (like {@link PersistedCompactionStateManager}) or + * use in-memory storage (like {@link HeapMemoryCompactionStateManager}). + */ +public interface CompactionStateManager +{ + /** + * Generates a deterministic fingerprint for the given compaction state and datasource. + * The fingerprint is a SHA-256 hash of the datasource name and serialized compaction state. + * + * @param compactionState The compaction configuration to fingerprint + * @param dataSource The datasource name + * @return A hex-encoded SHA-256 fingerprint string + */ + String generateCompactionStateFingerprint(CompactionState compactionState, String dataSource); Review Comment: Might be worth calling out that a fingerprint is unique across the board and not just within a single datasource. ########## server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java: ########## @@ -84,12 +88,16 @@ public DataSourceCompactibleSegmentIterator( DataSourceCompactionConfig config, SegmentTimeline timeline, List<Interval> skipIntervals, - CompactionCandidateSearchPolicy searchPolicy + CompactionCandidateSearchPolicy searchPolicy, + CompactionStateManager compactionStateManager, + CompactionStateCache compactionStateCache Review Comment: Instead of passing these 2 concrete classes, please pass in a new interface `CompactionFingerprintMapper` which has only 2 methods: - `generateFingerprint(dataSource, state)` - `getStateForFingerprint(fingerprint)` It would simplify the dependencies and testability of this class. We could use the same `CompactionFingerprintMapper` interface in `CompactionStatus`, `CompactionJobParams`, and `PriorityBasedCompactionSegmentIterator` too. The concrete impl of `CompactionFingerprintMapper` would simply delegate these operations to the respective cache and storage, as applicable. ########## server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java: ########## @@ -351,40 +388,143 @@ private CompactionStatus evaluate() return inputBytesCheck; } - final List<String> reasonsForCompaction = + List<String> reasonsForCompaction = new ArrayList<>(); + CompactionStatus compactedOnceCheck = segmentsHaveBeenCompactedAtLeastOnce(); + if (!compactedOnceCheck.isComplete()) { + reasonsForCompaction.add(compactedOnceCheck.getReason()); + } + + if (compactionStateCache != null && targetFingerprint != null) { + // First try fingerprint-based evaluation (fast path) + CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream() + .map(f -> f.apply(this)) + .filter(status -> !status.isComplete()) + .findFirst().orElse(COMPLETE); + + if (!fingerprintStatus.isComplete()) { + reasonsForCompaction.add(fingerprintStatus.getReason()); + } + } + + reasonsForCompaction.addAll( CHECKS.stream() .map(f -> f.apply(this)) .filter(status -> !status.isComplete()) .map(CompactionStatus::getReason) - .collect(Collectors.toList()); + .collect(Collectors.toList()) + ); // Consider segments which have passed all checks to be compacted - final List<DataSegment> compactedSegments = unknownStateToSegments - .values() - .stream() - .flatMap(List::stream) - .collect(Collectors.toList()); + // Includes segments with correct fingerprints and segments that passed all state checks + final List<DataSegment> allCompactedSegments = new ArrayList<>(this.compactedSegments); + allCompactedSegments.addAll( + unknownStateToSegments + .values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + ); if (reasonsForCompaction.isEmpty()) { return COMPLETE; } else { return CompactionStatus.pending( - createStats(compactedSegments), + createStats(allCompactedSegments), createStats(uncompactedSegments), reasonsForCompaction.get(0) ); } } + /** + * Evaluates the fingerprints of all fingerprinted candidate segments against the expected fingerprint. + * <p> + * If all fingerprinted segments have the expected fingerprint, the check can quickly pass as COMPLETE. However, + * if any fingerprinted segment has a mismatched fingerprint, we need to investigate further by adding them to + * {@link #unknownStateToSegments} where their compaction states will be analyzed. + * </p> + */ + private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() + { + Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new HashMap<>(); + for (DataSegment segment : fingerprintedSegments) { + String fingerprint = segment.getCompactionStateFingerprint(); + if (fingerprint != null && !fingerprint.equals(targetFingerprint)) { + mismatchedFingerprintToSegmentMap + .computeIfAbsent(fingerprint, k -> new ArrayList<>()) + .add(segment); + } else if (fingerprint != null && fingerprint.equals(targetFingerprint)) { + // Segment has correct fingerprint - add to compacted segments + compactedSegments.add(segment); + } + } + + if (mismatchedFingerprintToSegmentMap.isEmpty()) { + return COMPLETE; + } + + boolean fingerprintedSegmentNeedingCompactionFound = false; + + if (compactionStateCache != null) { + for (Map.Entry<String, List<DataSegment>> e : mismatchedFingerprintToSegmentMap.entrySet()) { + String fingerprint = e.getKey(); + CompactionState stateToValidate = compactionStateCache.getCompactionStateByFingerprint(fingerprint).orElse(null); + if (stateToValidate == null) { + log.warn("No compaction state found for fingerprint[%s]", fingerprint); + fingerprintedSegmentNeedingCompactionFound = true; + uncompactedSegments.addAll(e.getValue()); + } else { + // Note that this does not mean we need compaction yet - we need to validate the state further to determine this + unknownStateToSegments.compute( + stateToValidate, + (state, segments) -> { + if (segments == null) { + segments = new ArrayList<>(); + } + segments.addAll(e.getValue()); + return segments; + } + ); + } + } + } else { + for (Map.Entry<String, List<DataSegment>> e : mismatchedFingerprintToSegmentMap.entrySet()) { + uncompactedSegments.addAll(e.getValue()); + fingerprintedSegmentNeedingCompactionFound = true; + } + } + + if (fingerprintedSegmentNeedingCompactionFound) { + return CompactionStatus.pending("At least one segment has a mismatched fingerprint and needs compaction"); + } else { + return COMPLETE; + } + } + + /** + * Divvys up segments by certain characteristics and determines if any segments have never been compacted. + * <p> + * Segments are categorized into three groups: + * <ul> + * <li>fingerprinted - segments who have a compaction state fingerprint and need more investigation before adding to {@link #unknownStateToSegments}</li> + * <li>non-fingerprinted with a lastCompactionState - segments who have no fingerprint but have stored a lastCompactionState that needs to be analyzed</li> + * <li>uncompacted - segments who have neither a fingerprint nor a lastCompactionState and thus definitely need compaction</li> + * </ul> + * </p> + */ private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce() { - // Identify the compaction states of all the segments for (DataSegment segment : candidateSegments.getSegments()) { - final CompactionState segmentState = segment.getLastCompactionState(); - if (segmentState == null) { - uncompactedSegments.add(segment); + final String fingerprint = segment.getCompactionStateFingerprint(); + if (fingerprint != null) { + fingerprintedSegments.add(segment); } else { - unknownStateToSegments.computeIfAbsent(segmentState, s -> new ArrayList<>()).add(segment); + final CompactionState segmentState = segment.getLastCompactionState(); + if (segmentState == null) { + uncompactedSegments.add(segment); + } else { + unknownStateToSegments.computeIfAbsent(segmentState, k -> new ArrayList<>()).add(segment); + } Review Comment: Nit: Please chain the if-else for better readability and a minimal diff. ```java final String fingerprint = segment.getCompactionStateFingerprint(); final CompactionState segmentState = segment.getLastCompactionState(); if (fingerprint != null) { // add to fingerprinted } else if (segmentState == null) { // add to uncompacted } else { // add to unknown state } ``` ########## server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java: ########## @@ -351,40 +388,143 @@ private CompactionStatus evaluate() return inputBytesCheck; } - final List<String> reasonsForCompaction = + List<String> reasonsForCompaction = new ArrayList<>(); + CompactionStatus compactedOnceCheck = segmentsHaveBeenCompactedAtLeastOnce(); + if (!compactedOnceCheck.isComplete()) { + reasonsForCompaction.add(compactedOnceCheck.getReason()); + } + + if (compactionStateCache != null && targetFingerprint != null) { + // First try fingerprint-based evaluation (fast path) + CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream() + .map(f -> f.apply(this)) + .filter(status -> !status.isComplete()) + .findFirst().orElse(COMPLETE); + + if (!fingerprintStatus.isComplete()) { + reasonsForCompaction.add(fingerprintStatus.getReason()); + } + } + + reasonsForCompaction.addAll( CHECKS.stream() .map(f -> f.apply(this)) .filter(status -> !status.isComplete()) .map(CompactionStatus::getReason) - .collect(Collectors.toList()); + .collect(Collectors.toList()) + ); // Consider segments which have passed all checks to be compacted - final List<DataSegment> compactedSegments = unknownStateToSegments - .values() - .stream() - .flatMap(List::stream) - .collect(Collectors.toList()); + // Includes segments with correct fingerprints and segments that passed all state checks + final List<DataSegment> allCompactedSegments = new ArrayList<>(this.compactedSegments); + allCompactedSegments.addAll( Review Comment: Nit: It might be cognitively simpler to just add the unknown state segments directly to `this.compactedSegments` instead of using a local variable `allCompactedSegments`. ########## server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java: ########## @@ -45,21 +45,34 @@ public class ClusterCompactionConfig private final boolean useSupervisors; private final CompactionEngine engine; private final CompactionCandidateSearchPolicy compactionPolicy; + /** + * Whether to persist last compaction state directly in segments for backwards compatibility. Review Comment: Please move this javadoc to the getter instead. ########## server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java: ########## @@ -674,4 +814,59 @@ private static CompactionStatistics createStats(List<DataSegment> segments) return CompactionStatistics.create(totalBytes, segments.size(), segmentIntervals.size()); } } + + /** + * Given a {@link DataSourceCompactionConfig}, create a {@link CompactionState} + */ + public static CompactionState createCompactionStateFromConfig(DataSourceCompactionConfig config) Review Comment: Should we move this method to `DataSourceCompactionConfig` interface itself? ```java default CompactionState toCompactionState() { ... } ``` ########## server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java: ########## @@ -45,21 +45,34 @@ public class ClusterCompactionConfig private final boolean useSupervisors; private final CompactionEngine engine; private final CompactionCandidateSearchPolicy compactionPolicy; + /** + * Whether to persist last compaction state directly in segments for backwards compatibility. + * <p> + * In a future release this option will be removed and last compaction state will no longer be persisted in segments. + * Instead, it will only be stored in the metadata store with a fingerprint id that segments will reference. Some + * operators may want to disable this behavior early to begin saving space in segment metadatastore table entries. + */ + private final boolean legacyPersistLastCompactionStateInSegments; @JsonCreator public ClusterCompactionConfig( @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy, @JsonProperty("useSupervisors") @Nullable Boolean useSupervisors, - @JsonProperty("engine") @Nullable CompactionEngine engine + @JsonProperty("engine") @Nullable CompactionEngine engine, + @JsonProperty("legacyPersistLastCompactionStateInSegments") Boolean legacyPersistLastCompactionStateInSegments Review Comment: Can we call this field `storeCompactionStatePerSegment` instead? Then it would align with the task context parameter, `storeCompactionState`. ########## multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1699,6 +1700,12 @@ private void handleQueryResults( Tasks.DEFAULT_STORE_COMPACTION_STATE ); + final String compactionStateFingerprint = querySpec.getContext() + .getString( + Tasks.COMPACTION_STATE_FINGERPRINT_KEY, + null + ); Review Comment: Can this be simplified? ```suggestion .getString(Tasks.COMPACTION_STATE_FINGERPRINT_KEY); ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java: ########## @@ -31,27 +32,37 @@ public class CompactionJob extends BatchIndexingJob { private final CompactionCandidate candidate; private final int maxRequiredTaskSlots; + private final String compactionStateFingerprint; + private final CompactionState compactionState; Review Comment: Should we rename these to `targetCompactionState` and `targetCompactionStateFingerprint`? ########## server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java: ########## @@ -351,40 +388,143 @@ private CompactionStatus evaluate() return inputBytesCheck; } - final List<String> reasonsForCompaction = + List<String> reasonsForCompaction = new ArrayList<>(); + CompactionStatus compactedOnceCheck = segmentsHaveBeenCompactedAtLeastOnce(); + if (!compactedOnceCheck.isComplete()) { + reasonsForCompaction.add(compactedOnceCheck.getReason()); + } + + if (compactionStateCache != null && targetFingerprint != null) { + // First try fingerprint-based evaluation (fast path) + CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream() + .map(f -> f.apply(this)) + .filter(status -> !status.isComplete()) + .findFirst().orElse(COMPLETE); + + if (!fingerprintStatus.isComplete()) { + reasonsForCompaction.add(fingerprintStatus.getReason()); + } + } + + reasonsForCompaction.addAll( CHECKS.stream() .map(f -> f.apply(this)) .filter(status -> !status.isComplete()) .map(CompactionStatus::getReason) - .collect(Collectors.toList()); + .collect(Collectors.toList()) + ); // Consider segments which have passed all checks to be compacted - final List<DataSegment> compactedSegments = unknownStateToSegments - .values() - .stream() - .flatMap(List::stream) - .collect(Collectors.toList()); + // Includes segments with correct fingerprints and segments that passed all state checks + final List<DataSegment> allCompactedSegments = new ArrayList<>(this.compactedSegments); + allCompactedSegments.addAll( + unknownStateToSegments + .values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + ); if (reasonsForCompaction.isEmpty()) { return COMPLETE; } else { return CompactionStatus.pending( - createStats(compactedSegments), + createStats(allCompactedSegments), createStats(uncompactedSegments), reasonsForCompaction.get(0) ); } } + /** + * Evaluates the fingerprints of all fingerprinted candidate segments against the expected fingerprint. + * <p> + * If all fingerprinted segments have the expected fingerprint, the check can quickly pass as COMPLETE. However, + * if any fingerprinted segment has a mismatched fingerprint, we need to investigate further by adding them to + * {@link #unknownStateToSegments} where their compaction states will be analyzed. + * </p> + */ + private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() + { + Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new HashMap<>(); + for (DataSegment segment : fingerprintedSegments) { + String fingerprint = segment.getCompactionStateFingerprint(); + if (fingerprint != null && !fingerprint.equals(targetFingerprint)) { + mismatchedFingerprintToSegmentMap + .computeIfAbsent(fingerprint, k -> new ArrayList<>()) + .add(segment); + } else if (fingerprint != null && fingerprint.equals(targetFingerprint)) { + // Segment has correct fingerprint - add to compacted segments + compactedSegments.add(segment); + } + } + + if (mismatchedFingerprintToSegmentMap.isEmpty()) { + return COMPLETE; + } + + boolean fingerprintedSegmentNeedingCompactionFound = false; + + if (compactionStateCache != null) { + for (Map.Entry<String, List<DataSegment>> e : mismatchedFingerprintToSegmentMap.entrySet()) { + String fingerprint = e.getKey(); + CompactionState stateToValidate = compactionStateCache.getCompactionStateByFingerprint(fingerprint).orElse(null); + if (stateToValidate == null) { + log.warn("No compaction state found for fingerprint[%s]", fingerprint); + fingerprintedSegmentNeedingCompactionFound = true; + uncompactedSegments.addAll(e.getValue()); + } else { + // Note that this does not mean we need compaction yet - we need to validate the state further to determine this + unknownStateToSegments.compute( + stateToValidate, + (state, segments) -> { + if (segments == null) { + segments = new ArrayList<>(); + } + segments.addAll(e.getValue()); + return segments; + } + ); + } + } + } else { + for (Map.Entry<String, List<DataSegment>> e : mismatchedFingerprintToSegmentMap.entrySet()) { + uncompactedSegments.addAll(e.getValue()); + fingerprintedSegmentNeedingCompactionFound = true; + } + } + + if (fingerprintedSegmentNeedingCompactionFound) { + return CompactionStatus.pending("At least one segment has a mismatched fingerprint and needs compaction"); + } else { + return COMPLETE; + } + } + + /** + * Divvys up segments by certain characteristics and determines if any segments have never been compacted. + * <p> + * Segments are categorized into three groups: + * <ul> + * <li>fingerprinted - segments who have a compaction state fingerprint and need more investigation before adding to {@link #unknownStateToSegments}</li> + * <li>non-fingerprinted with a lastCompactionState - segments who have no fingerprint but have stored a lastCompactionState that needs to be analyzed</li> + * <li>uncompacted - segments who have neither a fingerprint nor a lastCompactionState and thus definitely need compaction</li> + * </ul> + * </p> Review Comment: Maybe we can shorten this a little, since it is only a private method and the categories are self-explanatory. ```suggestion * Checks if all the segments have been compacted at least once and groups them into * uncompacted, fingerprinted or non-fingerprinted. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
