kfaraz commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2660241828


##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.CompactionState;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of compaction states used by {@link 
org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}.
+ * <p>
+ * This cache stores compaction states for published segments polled from the 
metadata store.
+ * It is the PRIMARY way to read compaction states in production.
+ * <p>
+ * The cache is populated during segment metadata cache sync operations and 
provides fast lookups
+ * without hitting the database.
+ */
+@LazySingleton
+public class CompactionStateCache
+{
+  private static final Logger log = new Logger(CompactionStateCache.class);
+
+  /**
+   * Atomically updated reference to published compaction states.
+   */
+  private final AtomicReference<PublishedCompactionStates> 
publishedCompactionStates
+      = new AtomicReference<>(PublishedCompactionStates.EMPTY);
+
+  private final AtomicInteger cacheMissCount = new AtomicInteger(0);
+  private final AtomicInteger cacheHitCount = new AtomicInteger(0);
+
+  public boolean isEnabled()
+  {
+    // Always enabled when this implementation is bound
+    return true;
+  }
+
+  /**
+   * Resets the cache with compaction states polled from the metadata store.
+   * Called after each successful poll in HeapMemorySegmentMetadataCache.
+   *
+   * @param fingerprintToStateMap Complete map of all active compaction state 
fingerprints
+   */
+  public void resetCompactionStatesForPublishedSegments(
+      Map<String, CompactionState> fingerprintToStateMap
+  )
+  {
+    this.publishedCompactionStates.set(
+        new PublishedCompactionStates(fingerprintToStateMap)
+    );
+    log.debug("Reset compaction state cache with [%d] fingerprints", 
fingerprintToStateMap.size());
+  }
+
+  /**
+   * Retrieves a compaction state by its fingerprint.
+   * This is the PRIMARY method for reading compaction states.
+   *
+   * @param fingerprint The fingerprint to look up
+   * @return The compaction state, or Optional.empty() if not cached
+   */
+  public Optional<CompactionState> getCompactionStateByFingerprint(String 
fingerprint)
+  {
+    if (fingerprint == null) {
+      return Optional.empty();
+    }
+
+    CompactionState state = publishedCompactionStates.get()
+                                                     .fingerprintToStateMap
+                                                     .get(fingerprint);
+    if (state != null) {
+      cacheHitCount.incrementAndGet();
+      return Optional.of(state);
+    }
+
+    cacheMissCount.incrementAndGet();
+    return Optional.empty();

Review Comment:
   Please do this in an if-else instead, preferably with the null check 
inverted.
   
   
   ```suggestion
       if (state == null) {
         cacheMissCount.incrementAndGet();
         return Optional.empty();
       } else {
          cacheHitCount.incrementAndGet();
          return Optional.of(state);
       }
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.CompactionState;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of compaction states used by {@link 
org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}.
+ * <p>
+ * This cache stores compaction states for published segments polled from the 
metadata store.
+ * It is the PRIMARY way to read compaction states in production.
+ * <p>
+ * The cache is populated during segment metadata cache sync operations and 
provides fast lookups
+ * without hitting the database.
+ */
+@LazySingleton
+public class CompactionStateCache
+{
+  private static final Logger log = new Logger(CompactionStateCache.class);
+
+  /**
+   * Atomically updated reference to published compaction states.
+   */
+  private final AtomicReference<PublishedCompactionStates> 
publishedCompactionStates
+      = new AtomicReference<>(PublishedCompactionStates.EMPTY);
+
+  private final AtomicInteger cacheMissCount = new AtomicInteger(0);
+  private final AtomicInteger cacheHitCount = new AtomicInteger(0);
+
+  public boolean isEnabled()
+  {
+    // Always enabled when this implementation is bound
+    return true;
+  }
+
+  /**
+   * Resets the cache with compaction states polled from the metadata store.
+   * Called after each successful poll in HeapMemorySegmentMetadataCache.
+   *
+   * @param fingerprintToStateMap Complete map of all active compaction state 
fingerprints
+   */
+  public void resetCompactionStatesForPublishedSegments(
+      Map<String, CompactionState> fingerprintToStateMap
+  )
+  {
+    this.publishedCompactionStates.set(
+        new PublishedCompactionStates(fingerprintToStateMap)
+    );
+    log.debug("Reset compaction state cache with [%d] fingerprints", 
fingerprintToStateMap.size());
+  }
+
+  /**
+   * Retrieves a compaction state by its fingerprint.
+   * This is the PRIMARY method for reading compaction states.
+   *
+   * @param fingerprint The fingerprint to look up
+   * @return The compaction state, or Optional.empty() if not cached
+   */
+  public Optional<CompactionState> getCompactionStateByFingerprint(String 
fingerprint)
+  {
+    if (fingerprint == null) {
+      return Optional.empty();
+    }
+
+    CompactionState state = publishedCompactionStates.get()
+                                                     .fingerprintToStateMap
+                                                     .get(fingerprint);
+    if (state != null) {
+      cacheHitCount.incrementAndGet();
+      return Optional.of(state);
+    }
+
+    cacheMissCount.incrementAndGet();
+    return Optional.empty();
+  }
+
+  /**
+   * Gets the full cached map (immutable copy).
+   * Used by HeapMemorySegmentMetadataCache for delta sync calculations.
+   */
+  public Map<String, CompactionState> getPublishedCompactionStateMap()
+  {
+    return publishedCompactionStates.get().fingerprintToStateMap;
+  }
+
+  /**
+   * Clears the cache. Called when node stops being leader.
+   */
+  public void clear()
+  {
+    publishedCompactionStates.set(PublishedCompactionStates.EMPTY);

Review Comment:
   Should we also reset the hit and miss counts here?



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.CompactionState;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of compaction states used by {@link 
org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}.
+ * <p>
+ * This cache stores compaction states for published segments polled from the 
metadata store.
+ * It is the PRIMARY way to read compaction states in production.
+ * <p>
+ * The cache is populated during segment metadata cache sync operations and 
provides fast lookups
+ * without hitting the database.
+ */
+@LazySingleton
+public class CompactionStateCache
+{
+  private static final Logger log = new Logger(CompactionStateCache.class);
+
+  /**
+   * Atomically updated reference to published compaction states.
+   */
+  private final AtomicReference<PublishedCompactionStates> 
publishedCompactionStates
+      = new AtomicReference<>(PublishedCompactionStates.EMPTY);
+
+  private final AtomicInteger cacheMissCount = new AtomicInteger(0);
+  private final AtomicInteger cacheHitCount = new AtomicInteger(0);
+
+  public boolean isEnabled()
+  {
+    // Always enabled when this implementation is bound
+    return true;
+  }
+
+  /**
+   * Resets the cache with compaction states polled from the metadata store.
+   * Called after each successful poll in HeapMemorySegmentMetadataCache.
+   *
+   * @param fingerprintToStateMap Complete map of all active compaction state 
fingerprints
+   */
+  public void resetCompactionStatesForPublishedSegments(
+      Map<String, CompactionState> fingerprintToStateMap
+  )
+  {
+    this.publishedCompactionStates.set(
+        new PublishedCompactionStates(fingerprintToStateMap)
+    );
+    log.debug("Reset compaction state cache with [%d] fingerprints", 
fingerprintToStateMap.size());
+  }
+
+  /**
+   * Retrieves a compaction state by its fingerprint.
+   * This is the PRIMARY method for reading compaction states.
+   *
+   * @param fingerprint The fingerprint to look up
+   * @return The compaction state, or Optional.empty() if not cached
+   */
+  public Optional<CompactionState> getCompactionStateByFingerprint(String 
fingerprint)
+  {
+    if (fingerprint == null) {
+      return Optional.empty();
+    }
+
+    CompactionState state = publishedCompactionStates.get()
+                                                     .fingerprintToStateMap
+                                                     .get(fingerprint);
+    if (state != null) {
+      cacheHitCount.incrementAndGet();
+      return Optional.of(state);
+    }
+
+    cacheMissCount.incrementAndGet();
+    return Optional.empty();
+  }
+
+  /**
+   * Gets the full cached map (immutable copy).
+   * Used by HeapMemorySegmentMetadataCache for delta sync calculations.
+   */
+  public Map<String, CompactionState> getPublishedCompactionStateMap()
+  {
+    return publishedCompactionStates.get().fingerprintToStateMap;
+  }
+
+  /**
+   * Clears the cache. Called when node stops being leader.
+   */
+  public void clear()
+  {
+    publishedCompactionStates.set(PublishedCompactionStates.EMPTY);
+    log.info("Cleared compaction state cache");

Review Comment:
   I think we can omit this log line assuming that `clear()` is called only on 
service shutdown.



##########
docs/configuration/index.md:
##########
@@ -389,6 +389,7 @@ These properties specify the JDBC connection and other 
configuration around the
 |`druid.metadata.storage.tables.segments`|The table to use to look for 
segments.|`druid_segments`|
 |`druid.metadata.storage.tables.rules`|The table to use to look for segment 
load/drop rules.|`druid_rules`|
 |`druid.metadata.storage.tables.config`|The table to use to look for 
configs.|`druid_config`|
+|`druid.metadata.storage.tables.compactionStates`|The table to use to store 
compaction state fingerprints.|`druid_compactionStates`|

Review Comment:
   ```suggestion
   |`druid.metadata.storage.tables.compactionStates`|The table to use to store 
compaction state payloads and fingerprints.|`druid_compactionStates`|
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java:
##########
@@ -273,6 +276,7 @@ public void testSerde() throws Exception
             .addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG)
             .addValue(OverlordClient.class, overlordClient)
             .addValue(CompactionStatusTracker.class, statusTracker)
+            .addValue(CompactionStateManager.class, compactionStateManager)

Review Comment:
   I think this can be removed now since `CompactSegments` doesn't use this 
anymore.



##########
server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.
+ * Database-specific operations (cleanup, unused marking) are no-ops in this 
implementation.
+ */
+public class HeapMemoryCompactionStateManager implements CompactionStateManager
+{
+  private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = 
new ConcurrentHashMap<>();
+  private final ObjectMapper deterministicMapper;
+
+  /**
+   * Creates an in-memory compaction state manager with a default 
deterministic mapper.
+   * This is a convenience constructor for tests and simulations.
+   */
+  public HeapMemoryCompactionStateManager()
+  {
+    this(createDeterministicMapper());
+  }
+
+  /**
+   * Creates an in-memory compaction state manager with the provided 
deterministic mapper
+   * for fingerprint generation.
+   *
+   * @param deterministicMapper ObjectMapper configured for deterministic 
serialization
+   */
+  public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper)
+  {
+    this.deterministicMapper = deterministicMapper;
+  }
+
+  /**
+   * Creates an ObjectMapper configured for deterministic serialization.
+   * Used for generating consistent fingerprints.
+   */
+  private static ObjectMapper createDeterministicMapper()
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+    mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
+    return mapper;
+  }
+
+  @Override
+  @SuppressWarnings("UnstableApiUsage")
+  public String generateCompactionStateFingerprint(
+      final CompactionState compactionState,
+      final String dataSource
+  )
+  {
+    final Hasher hasher = Hashing.sha256().newHasher();

Review Comment:
   Instead of duplicating this logic, would it make sense to have this class 
extend the concrete `PersistedCompactionStateManager` and not override just 
this one method?
   Alternatively, the `generateFingerprint` could be a static method in 
`PersistedCompactionStateManager` which accepts datasource, state and the 
object mapper. 



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +388,143 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (compactionStateCache != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      final List<DataSegment> allCompactedSegments = new 
ArrayList<>(this.compactedSegments);
+      allCompactedSegments.addAll(
+          unknownStateToSegments
+              .values()
+              .stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toList())
+      );
 
       if (reasonsForCompaction.isEmpty()) {
         return COMPLETE;
       } else {
         return CompactionStatus.pending(
-            createStats(compactedSegments),
+            createStats(allCompactedSegments),
             createStats(uncompactedSegments),
             reasonsForCompaction.get(0)
         );
       }
     }
 
+    /**
+     * Evaluates the fingerprints of all fingerprinted candidate segments 
against the expected fingerprint.
+     * <p>
+     * If all fingerprinted segments have the expected fingerprint, the check 
can quickly pass as COMPLETE. However,
+     * if any fingerprinted segment has a mismatched fingerprint, we need to 
investigate further by adding them to
+     * {@link #unknownStateToSegments} where their compaction states will be 
analyzed.
+     * </p>
+     */
+    private CompactionStatus 
allFingerprintedCandidatesHaveExpectedFingerprint()
+    {
+      Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new 
HashMap<>();
+      for (DataSegment segment : fingerprintedSegments) {
+        String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null && !fingerprint.equals(targetFingerprint)) {
+          mismatchedFingerprintToSegmentMap
+              .computeIfAbsent(fingerprint, k -> new ArrayList<>())
+              .add(segment);
+        } else if (fingerprint != null && 
fingerprint.equals(targetFingerprint)) {
+          // Segment has correct fingerprint - add to compacted segments
+          compactedSegments.add(segment);
+        }
+      }
+
+      if (mismatchedFingerprintToSegmentMap.isEmpty()) {
+        return COMPLETE;
+      }
+
+      boolean fingerprintedSegmentNeedingCompactionFound = false;
+
+      if (compactionStateCache != null) {

Review Comment:
   Please simplify this condition to have a return early syntax.
   
   e.g.
   
   ```java
   if (cache == null) {
      // move all mismatching segments to unknown state
      return CompactionStatus.pending("Segments have a mismatching 
fingerprint");
   }
   
   // The rest of the logic goes here. Do not put in an else block to avoid 
unnecessary indentation
   ```
   
   Just to confirm, the `compactionStateCache` can be null only when running 
the legacy `CompactSegments` duty on the Coordinator, right? It is never 
expected to be null when running supervisors on the Overlord?



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +388,143 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (compactionStateCache != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      final List<DataSegment> allCompactedSegments = new 
ArrayList<>(this.compactedSegments);
+      allCompactedSegments.addAll(
+          unknownStateToSegments
+              .values()
+              .stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toList())
+      );
 
       if (reasonsForCompaction.isEmpty()) {
         return COMPLETE;
       } else {
         return CompactionStatus.pending(
-            createStats(compactedSegments),
+            createStats(allCompactedSegments),
             createStats(uncompactedSegments),
             reasonsForCompaction.get(0)
         );
       }
     }
 
+    /**
+     * Evaluates the fingerprints of all fingerprinted candidate segments 
against the expected fingerprint.
+     * <p>
+     * If all fingerprinted segments have the expected fingerprint, the check 
can quickly pass as COMPLETE. However,
+     * if any fingerprinted segment has a mismatched fingerprint, we need to 
investigate further by adding them to
+     * {@link #unknownStateToSegments} where their compaction states will be 
analyzed.
+     * </p>
+     */
+    private CompactionStatus 
allFingerprintedCandidatesHaveExpectedFingerprint()
+    {
+      Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new 
HashMap<>();
+      for (DataSegment segment : fingerprintedSegments) {
+        String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null && !fingerprint.equals(targetFingerprint)) {
+          mismatchedFingerprintToSegmentMap
+              .computeIfAbsent(fingerprint, k -> new ArrayList<>())
+              .add(segment);
+        } else if (fingerprint != null && 
fingerprint.equals(targetFingerprint)) {
+          // Segment has correct fingerprint - add to compacted segments
+          compactedSegments.add(segment);
+        }

Review Comment:
   Please simplify the if-else:
   ```java
   if (fingerprint == null) {
     // Should never happen since these are all fingerprintedSegments
   } else if (fingerprint.equals(target)) {
     // compacted
   } else {
     // mismatched
   }



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateCache.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.CompactionState;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of compaction states used by {@link 
org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache}.
+ * <p>
+ * This cache stores compaction states for published segments polled from the 
metadata store.
+ * It is the PRIMARY way to read compaction states in production.
+ * <p>
+ * The cache is populated during segment metadata cache sync operations and 
provides fast lookups
+ * without hitting the database.
+ */
+@LazySingleton
+public class CompactionStateCache
+{
+  private static final Logger log = new Logger(CompactionStateCache.class);
+
+  /**
+   * Atomically updated reference to published compaction states.
+   */
+  private final AtomicReference<PublishedCompactionStates> 
publishedCompactionStates
+      = new AtomicReference<>(PublishedCompactionStates.EMPTY);
+
+  private final AtomicInteger cacheMissCount = new AtomicInteger(0);
+  private final AtomicInteger cacheHitCount = new AtomicInteger(0);
+
+  public boolean isEnabled()
+  {
+    // Always enabled when this implementation is bound
+    return true;
+  }
+
+  /**
+   * Resets the cache with compaction states polled from the metadata store.
+   * Called after each successful poll in HeapMemorySegmentMetadataCache.
+   *
+   * @param fingerprintToStateMap Complete map of all active compaction state 
fingerprints
+   */
+  public void resetCompactionStatesForPublishedSegments(
+      Map<String, CompactionState> fingerprintToStateMap
+  )
+  {
+    this.publishedCompactionStates.set(
+        new PublishedCompactionStates(fingerprintToStateMap)
+    );
+    log.debug("Reset compaction state cache with [%d] fingerprints", 
fingerprintToStateMap.size());
+  }
+
+  /**
+   * Retrieves a compaction state by its fingerprint.
+   * This is the PRIMARY method for reading compaction states.
+   *
+   * @param fingerprint The fingerprint to look up
+   * @return The compaction state, or Optional.empty() if not cached
+   */
+  public Optional<CompactionState> getCompactionStateByFingerprint(String 
fingerprint)
+  {
+    if (fingerprint == null) {
+      return Optional.empty();
+    }
+
+    CompactionState state = publishedCompactionStates.get()
+                                                     .fingerprintToStateMap
+                                                     .get(fingerprint);
+    if (state != null) {
+      cacheHitCount.incrementAndGet();
+      return Optional.of(state);
+    }
+
+    cacheMissCount.incrementAndGet();
+    return Optional.empty();
+  }
+
+  /**
+   * Gets the full cached map (immutable copy).
+   * Used by HeapMemorySegmentMetadataCache for delta sync calculations.
+   */
+  public Map<String, CompactionState> getPublishedCompactionStateMap()
+  {
+    return publishedCompactionStates.get().fingerprintToStateMap;
+  }
+
+  /**
+   * Clears the cache. Called when node stops being leader.
+   */
+  public void clear()
+  {
+    publishedCompactionStates.set(PublishedCompactionStates.EMPTY);
+    log.info("Cleared compaction state cache");
+  }
+
+  /**
+   * @return Summary stats for metric emission
+   */
+  public Map<String, Integer> getStats()

Review Comment:
   ```suggestion
     public Map<String, Integer> getAndResetStats()
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.Deterministic;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Database-backed implementation of {@link CompactionStateManager}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} objects in 
the metadata storage.
+ * Compaction states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content. A cache
+ * of compaction states using the fingerprints as keys is maintained in memory 
to optimize retrieval performance.
+ * </p>
+ * <p>
+ * A striped locking mechanism is used to ensure thread-safe persistence of 
compaction states on a per-datasource basis.
+ * </p>
+ */
+@ManageLifecycle
+public class PersistedCompactionStateManager implements CompactionStateManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(PersistedCompactionStateManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final ObjectMapper deterministicMapper;
+  private final SQLMetadataConnector connector;
+  private final Striped<Lock> datasourceLocks = Striped.lock(128);
+
+  @Inject
+  public PersistedCompactionStateManager(
+      @Nonnull MetadataStorageTablesConfig dbTables,
+      @Nonnull ObjectMapper jsonMapper,
+      @Deterministic @Nonnull ObjectMapper deterministicMapper,
+      @Nonnull SQLMetadataConnector connector
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.deterministicMapper = deterministicMapper;
+    this.connector = connector;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+  }
+
+  @VisibleForTesting
+  PersistedCompactionStateManager()
+  {
+    this.dbTables = null;
+    this.jsonMapper = null;
+    this.deterministicMapper = null;
+    this.connector = null;
+  }
+
+  @Override
+  public void persistCompactionState(
+      final String dataSource,
+      final Map<String, CompactionState> fingerprintToStateMap,
+      final DateTime updateTime
+  )
+  {
+    if (fingerprintToStateMap.isEmpty()) {
+      return;
+    }
+
+    final Lock lock = datasourceLocks.get(dataSource);

Review Comment:
   IIUC, we are using the lock so that the same fingerprint is not 
updated/inserted in the metadata store by another thread while we are working 
here.
   
   I think we should do the following instead:
   - We can remove the locking, since the persistence will happen from only a 
single thread anyway (via `CompactionJobQueue.runReadyJobs()`).
   - We can just log an error, if there are conflicts since this method should 
be idempotent anyway.



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages compaction state persistence and fingerprint generation.
+ * <p>
+ * Implementations may be backed by a database (like {@link 
PersistedCompactionStateManager}) or
+ * use in-memory storage (like {@link HeapMemoryCompactionStateManager}).
+ */
+public interface CompactionStateManager
+{
+  /**
+   * Generates a deterministic fingerprint for the given compaction state and 
datasource.
+   * The fingerprint is a SHA-256 hash of the datasource name and serialized 
compaction state.
+   *
+   * @param compactionState The compaction configuration to fingerprint
+   * @param dataSource The datasource name
+   * @return A hex-encoded SHA-256 fingerprint string
+   */
+  String generateCompactionStateFingerprint(CompactionState compactionState, 
String dataSource);
+
+  /**
+   * Persists compaction states to storage.
+   *
+   * @param dataSource The datasource name
+   * @param fingerprintToStateMap Map of fingerprints to their compaction 
states
+   * @param updateTime The timestamp for this update
+   */
+  void persistCompactionState(

Review Comment:
   The only usage of this method seems to pass in a single fingerprint right 
now. Should we just keep that for the time being? We can add the batch upsert 
when needed later.



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages compaction state persistence and fingerprint generation.
+ * <p>
+ * Implementations may be backed by a database (like {@link 
PersistedCompactionStateManager}) or
+ * use in-memory storage (like {@link HeapMemoryCompactionStateManager}).
+ */
+public interface CompactionStateManager
+{
+  /**
+   * Generates a deterministic fingerprint for the given compaction state and 
datasource.
+   * The fingerprint is a SHA-256 hash of the datasource name and serialized 
compaction state.
+   *
+   * @param compactionState The compaction configuration to fingerprint
+   * @param dataSource The datasource name
+   * @return A hex-encoded SHA-256 fingerprint string
+   */
+  String generateCompactionStateFingerprint(CompactionState compactionState, 
String dataSource);
+
+  /**
+   * Persists compaction states to storage.
+   *
+   * @param dataSource The datasource name
+   * @param fingerprintToStateMap Map of fingerprints to their compaction 
states
+   * @param updateTime The timestamp for this update
+   */
+  void persistCompactionState(

Review Comment:
   Maybe rename to `upsertCompactionState` instead.



##########
server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.Deterministic;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Database-backed implementation of {@link CompactionStateManager}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} objects in 
the metadata storage.
+ * Compaction states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content. A cache
+ * of compaction states using the fingerprints as keys is maintained in memory 
to optimize retrieval performance.
+ * </p>
+ * <p>
+ * A striped locking mechanism is used to ensure thread-safe persistence of 
compaction states on a per-datasource basis.
+ * </p>
+ */
+@ManageLifecycle
+public class PersistedCompactionStateManager implements CompactionStateManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(PersistedCompactionStateManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final ObjectMapper deterministicMapper;
+  private final SQLMetadataConnector connector;
+  private final Striped<Lock> datasourceLocks = Striped.lock(128);
+
+  @Inject
+  public PersistedCompactionStateManager(
+      @Nonnull MetadataStorageTablesConfig dbTables,
+      @Nonnull ObjectMapper jsonMapper,
+      @Deterministic @Nonnull ObjectMapper deterministicMapper,
+      @Nonnull SQLMetadataConnector connector
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.deterministicMapper = deterministicMapper;
+    this.connector = connector;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+  }
+
+  @LifecycleStop
+  public void stop()

Review Comment:
   If these methods are going to be no-op, let's remove them and bind this 
class as a `LazySingleton` instead of a managed one.



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages compaction state persistence and fingerprint generation.
+ * <p>
+ * Implementations may be backed by a database (like {@link 
PersistedCompactionStateManager}) or
+ * use in-memory storage (like {@link HeapMemoryCompactionStateManager}).
+ */
+public interface CompactionStateManager
+{
+  /**
+   * Generates a deterministic fingerprint for the given compaction state and 
datasource.
+   * The fingerprint is a SHA-256 hash of the datasource name and serialized 
compaction state.
+   *
+   * @param compactionState The compaction configuration to fingerprint
+   * @param dataSource The datasource name
+   * @return A hex-encoded SHA-256 fingerprint string
+   */
+  String generateCompactionStateFingerprint(CompactionState compactionState, 
String dataSource);
+
+  /**
+   * Persists compaction states to storage.
+   *
+   * @param dataSource The datasource name
+   * @param fingerprintToStateMap Map of fingerprints to their compaction 
states
+   * @param updateTime The timestamp for this update
+   */
+  void persistCompactionState(
+      String dataSource,
+      Map<String, CompactionState> fingerprintToStateMap,
+      DateTime updateTime
+  );
+
+  /**
+   * Marks compaction states as unused if they are not referenced by any used 
segments.
+   * This is used for cleanup operations. Implementations may choose to no-op 
this.

Review Comment:
   Why is it okay to no-op this method and the other methods?
   I think it is better to require the concrete classes to provide an 
implementation, since the only non-test impl right now is the 
`PersistedCompactionStateManager`.



##########
server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java:
##########
@@ -1106,6 +1122,89 @@ private void emitMetric(String datasource, String 
metric, long value)
     );
   }
 
+  /**
+   * Retrieves required used compaction states from the metadata store and 
resets
+   * them in the {@link CompactionStateCache}. If this is the first sync, all 
used
+   * compaction states are retrieved from the metadata store. If this is a 
delta sync,
+   * first only the fingerprints of all used compaction states are retrieved. 
Payloads are
+   * then fetched for only the fingerprints which are not present in the cache.
+   */
+  private void retrieveAndResetUsedCompactionStates()
+  {
+    final Stopwatch compactionStateSyncDuration = Stopwatch.createStarted();
+
+    // Reset the CompactionStateCache with latest compaction states
+    final Map<String, CompactionState> fingerprintToStateMap;
+    if (syncFinishTime.get() == null) {
+      fingerprintToStateMap = buildFingerprintToStateMapForFullSync();
+    } else {
+      fingerprintToStateMap = buildFingerprintToStateMapForDeltaSync();
+    }
+
+    
compactionStateCache.resetCompactionStatesForPublishedSegments(fingerprintToStateMap);
+
+    // Emit metrics for the current contents of the cache
+    compactionStateCache.getStats().forEach(this::emitMetric);
+    emitMetric(Metric.RETRIEVE_COMPACTION_STATES_DURATION_MILLIS, 
compactionStateSyncDuration.millisElapsed());
+  }
+
+  /**
+   * Retrieves all used compaction states from the metadata store and builds a
+   * fresh map from compaction state fingerprint to state.
+   */
+  private Map<String, CompactionState> buildFingerprintToStateMapForFullSync()
+  {
+    final List<CompactionStateRecord> records = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStates
+    );
+
+    return records.stream().collect(
+        Collectors.toMap(
+            CompactionStateRecord::getFingerprint,
+            CompactionStateRecord::getState
+        )
+    );
+  }
+
+  /**
+   * Retrieves compaction states from the metadata store if they are not 
present
+   * in the cache or have been recently updated in the metadata store. These
+   * compaction states along with those already present in the cache are used 
to
+   * build a complete updated map from compaction state fingerprint to state.
+   *
+   * @return Complete updated map from compaction state fingerprint to state 
for all
+   * used compaction states currently persisted in the metadata store.
+   */
+  private Map<String, CompactionState> buildFingerprintToStateMapForDeltaSync()
+  {
+    // Identify fingerprints in the cache and in the metadata store
+    final Map<String, CompactionState> fingerprintToStateMap = new HashMap<>(
+        compactionStateCache.getPublishedCompactionStateMap()
+    );
+    final Set<String> cachedFingerprints = 
Set.copyOf(fingerprintToStateMap.keySet());
+    final Set<String> persistedFingerprints = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStateFingerprints
+    );
+
+    // Remove entry for compaction states that have been deleted from the 
metadata store
+    final Set<String> deletedFingerprints = 
Sets.difference(cachedFingerprints, persistedFingerprints);
+    deletedFingerprints.forEach(fingerprintToStateMap::remove);
+
+    // Retrieve and add entry for compaction states that have been added to 
the metadata store
+    final Set<String> addedFingerprints = 
Sets.difference(persistedFingerprints, cachedFingerprints);
+    final List<CompactionStateRecord> addedCompactionStateRecords = query(
+        sql -> sql.retrieveCompactionStatesForFingerprints(addedFingerprints)
+    );
+    if (addedCompactionStateRecords.size() < addedFingerprints.size()) {
+      emitMetric(Metric.SKIPPED_COMPACTION_STATES, addedFingerprints.size() - 
addedCompactionStateRecords.size());

Review Comment:
   Instead of this, should we just emit a metric for newly added fingerprints 
and a metric for newly deleted fingerprints?



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages compaction state persistence and fingerprint generation.
+ * <p>
+ * Implementations may be backed by a database (like {@link 
PersistedCompactionStateManager}) or
+ * use in-memory storage (like {@link HeapMemoryCompactionStateManager}).
+ */
+public interface CompactionStateManager

Review Comment:
   To avoid confusion with classes like `CompactionStatusTracker`, should we 
rename this to something like `CompactionStateStorage` and impl could be 
`CompactionStateSqlStorage`?



##########
server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.Deterministic;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Database-backed implementation of {@link CompactionStateManager}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} objects in 
the metadata storage.
+ * Compaction states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content. A cache
+ * of compaction states using the fingerprints as keys is maintained in memory 
to optimize retrieval performance.
+ * </p>
+ * <p>
+ * A striped locking mechanism is used to ensure thread-safe persistence of 
compaction states on a per-datasource basis.
+ * </p>
+ */
+@ManageLifecycle
+public class PersistedCompactionStateManager implements CompactionStateManager

Review Comment:
   Nit: We may rename this to `SqlCompactionStateManager` to align with other 
similar classes that handle persistence of any metadata in the DB.



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java:
##########
@@ -1701,6 +1705,132 @@ private SegmentSchemaRecord mapToSchemaRecord(ResultSet 
resultSet)
     }
   }
 
+  /**
+   * Retrieves all unique compaction state fingerprints currently referenced 
by used segments.
+   * This is used for delta syncs to determine which fingerprints are still 
active.
+   *
+   * @return Set of compaction state fingerprints
+   */
+  public Set<String> retrieveAllUsedCompactionStateFingerprints()
+  {
+    final String sql = StringUtils.format(
+        "SELECT DISTINCT compaction_state_fingerprint FROM %s "
+        + "WHERE used = true AND compaction_state_fingerprint IS NOT NULL",
+        dbTables.getSegmentsTable()

Review Comment:
   Instead of querying the segments table, please hit the compaction states 
table instead, since it is expected to be much smaller. As an added benefit, 
the compaction states table will never have fingerprint as null, so the SQL 
just becomes `SELECT compaction_state_fingerprint FROM druid_compactionStates`.



##########
server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java:
##########
@@ -785,13 +800,13 @@ private void retrieveAllUsedSegments(
     final String sql;
     if (useSchemaCache) {
       sql = StringUtils.format(
-          "SELECT id, payload, created_date, used_status_last_updated, 
schema_fingerprint, num_rows"
+          "SELECT id, payload, created_date, used_status_last_updated, 
compaction_state_fingerprint, schema_fingerprint, num_rows"

Review Comment:
   Super nit: Please put the `compaction_state_fingerprint` at the end so that 
the existing column indexes in the result set don't need to change.



##########
server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.Deterministic;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Database-backed implementation of {@link CompactionStateManager}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} objects in 
the metadata storage.
+ * Compaction states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content. A cache
+ * of compaction states using the fingerprints as keys is maintained in memory 
to optimize retrieval performance.
+ * </p>
+ * <p>
+ * A striped locking mechanism is used to ensure thread-safe persistence of 
compaction states on a per-datasource basis.
+ * </p>
+ */
+@ManageLifecycle
+public class PersistedCompactionStateManager implements CompactionStateManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(PersistedCompactionStateManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final ObjectMapper deterministicMapper;
+  private final SQLMetadataConnector connector;
+  private final Striped<Lock> datasourceLocks = Striped.lock(128);
+
+  @Inject
+  public PersistedCompactionStateManager(
+      @Nonnull MetadataStorageTablesConfig dbTables,
+      @Nonnull ObjectMapper jsonMapper,
+      @Deterministic @Nonnull ObjectMapper deterministicMapper,
+      @Nonnull SQLMetadataConnector connector
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.deterministicMapper = deterministicMapper;
+    this.connector = connector;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+  }
+
+  @VisibleForTesting
+  PersistedCompactionStateManager()

Review Comment:
   Can we use the actual constructor instead and just pass nulls?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedCompactionState.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.metadata.CompactionStateManager;
+import org.apache.druid.server.coordinator.config.MetadataCleanupConfig;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
+
+import java.util.List;
+
+/**
+ * Coordinator duty that cleans up old, unused compaction state entries from 
the database.
+ * <p>
+ * This duty performs a three-step cleanup process:
+ * <ol>
+ *   <li>Marks compaction states not referenced by any segments as unused</li>
+ *   <li>Repairs any incorrectly marked unused states that are still 
referenced by used segments</li>
+ *   <li>Deletes unused compaction states older than the configured retention 
period</li>
+ * </ol>
+ * <p>
+ * This prevents unbounded growth of the compaction states table while 
ensuring that
+ * states referenced by active segments are preserved.
+ */
+public class KillUnreferencedCompactionState extends MetadataCleanupDuty
+{
+  private static final Logger log = new 
Logger(KillUnreferencedCompactionState.class);
+  private final CompactionStateManager compactionStateManager;
+
+  public KillUnreferencedCompactionState(
+      MetadataCleanupConfig config,
+      CompactionStateManager compactionStateManager
+  )
+  {
+    super("compactionState", config, Stats.Kill.COMPACTION_STATE);
+    this.compactionStateManager = compactionStateManager;
+  }
+
+  @Override
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
+  {
+    // 1: Mark unreferenced states as unused
+    int unused = 
compactionStateManager.markUnreferencedCompactionStatesAsUnused();

Review Comment:
   It might be more desirable in the long term to have the Overlord perform all 
CRUD on the compaction states rather than the Coordinator, as the read 
operations could use the cache and the write operations could update the cache 
directly, thus keeping it fresh.
   
   I think there is a segment schema duty which does the same.
   
   One way to do it now would be to convert this duty into an `OverlordDuty`. 
It would pave way for the future migration of other cleanup duties to the 
Overlord.
   
   What do you think?



##########
server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java:
##########
@@ -2052,8 +2064,10 @@ TestDataSource.KOALA, 
configBuilder().forDataSource(TestDataSource.KOALA).build(
             TestDataSource.WIKI, SegmentTimeline.forSegments(wikiSegments),
             TestDataSource.KOALA, SegmentTimeline.forSegments(koalaSegments)
         ),
-        Collections.emptyMap()
-    );
+        Collections.emptyMap(),
+        compactionStateManager,
+        compactionStateCache
+        );

Review Comment:
   Nit: please fix the indentation here and in other places in this test.



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java:
##########
@@ -1701,6 +1705,132 @@ private SegmentSchemaRecord mapToSchemaRecord(ResultSet 
resultSet)
     }
   }
 
+  /**
+   * Retrieves all unique compaction state fingerprints currently referenced 
by used segments.
+   * This is used for delta syncs to determine which fingerprints are still 
active.
+   *
+   * @return Set of compaction state fingerprints
+   */
+  public Set<String> retrieveAllUsedCompactionStateFingerprints()
+  {
+    final String sql = StringUtils.format(
+        "SELECT DISTINCT compaction_state_fingerprint FROM %s "
+        + "WHERE used = true AND compaction_state_fingerprint IS NOT NULL",
+        dbTables.getSegmentsTable()
+    );
+
+    return Set.copyOf(
+        handle.createQuery(sql)
+              .setFetchSize(connector.getStreamingFetchSize())
+              .mapTo(String.class)
+              .list()
+    );
+  }
+
+  /**
+   * Retrieves all compaction states for used segments (full sync).
+   * Fetches from compaction_states table where the fingerprint is referenced 
by used segments.
+   *
+   * @return List of CompactionStateRecord objects
+   */
+  public List<CompactionStateRecord> retrieveAllUsedCompactionStates()
+  {
+    final String sql = StringUtils.format(
+        "SELECT cs.fingerprint, cs.payload FROM %s cs "
+        + "WHERE cs.used = true "
+        + "AND cs.fingerprint IN ("
+        + "  SELECT DISTINCT compaction_state_fingerprint FROM %s "
+        + "  WHERE used = true AND compaction_state_fingerprint IS NOT NULL"
+        + ")",
+        dbTables.getCompactionStatesTable(),
+        dbTables.getSegmentsTable()

Review Comment:
   Let's just ignore the segments table and query the compaction states table 
directly.
   It is okay to retrieve some compaction states even if they are not currently 
referenced. The background cleanup threads will take care of the references, 
and subsequent sync of the cache will add/remove the compaction state as needed.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable()
     }
   }
 
+  /**
+   * Creates the compaction states table for storing fingerprinted compaction 
states
+   * <p>
+   * This table stores unique compaction states that are referenced by
+   * segments via fingerprints.
+   */
+  public void createCompactionStatesTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"
+                + "  datasource VARCHAR(255) NOT NULL,\n"
+                + "  fingerprint VARCHAR(255) NOT NULL,\n"
+                + "  payload %3$s NOT NULL,\n"
+                + "  used BOOLEAN NOT NULL,\n"
+                + "  used_status_last_updated VARCHAR(255) NOT NULL,\n"
+                + "  PRIMARY KEY (id),\n"
+                + "  UNIQUE (fingerprint)\n"

Review Comment:
   If the fingerprint is unique anyway, do we still need an `id` column?
   I forget why the schemas table has an `id` column either.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -355,6 +355,7 @@ public void createSegmentTable(final String tableName)
     columns.add("used BOOLEAN NOT NULL");
     columns.add("payload %2$s NOT NULL");
     columns.add("used_status_last_updated VARCHAR(255) NOT NULL");
+    columns.add("compaction_state_fingerprint VARCHAR(255)");

Review Comment:
   Could you please also add the `upgraded_from_segment_id` column here? I 
think we had missed adding it, and it is present only in the 
`alterSegmentTable` method.



##########
processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java:
##########
@@ -95,4 +95,12 @@ default void exportTable(
    * SegmentSchema table is created only when CentralizedDatasourceSchema 
feature is enabled.
    */
   void createSegmentSchemasTable();
+
+  /**
+   * CompactionStates table is centralized store for {@link 
org.apache.druid.timeline.CompactionState} objects.
+   * <p>
+   * N segments can refer to the same compaction state via its unique 
fingerprint

Review Comment:
   ```suggestion
      * This table stores {@link org.apache.druid.timeline.CompactionState} 
objects.
      * <p>
      * Multiple segments can refer to the same compaction state via its unique 
fingerprint.
   ```



##########
server/src/main/java/org/apache/druid/metadata/segment/cache/CompactionStateRecord.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.segment.cache;
+
+import org.apache.druid.timeline.CompactionState;
+
+/**
+ * Represents a single record in the compaction_states table.

Review Comment:
   ```suggestion
    * Represents a single record in the druid_compactionStates table.
   ```



##########
server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.
+ * Database-specific operations (cleanup, unused marking) are no-ops in this 
implementation.
+ */
+public class HeapMemoryCompactionStateManager implements CompactionStateManager
+{
+  private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = 
new ConcurrentHashMap<>();
+  private final ObjectMapper deterministicMapper;
+
+  /**
+   * Creates an in-memory compaction state manager with a default 
deterministic mapper.
+   * This is a convenience constructor for tests and simulations.
+   */
+  public HeapMemoryCompactionStateManager()
+  {
+    this(createDeterministicMapper());
+  }
+
+  /**
+   * Creates an in-memory compaction state manager with the provided 
deterministic mapper
+   * for fingerprint generation.
+   *
+   * @param deterministicMapper ObjectMapper configured for deterministic 
serialization
+   */
+  public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper)
+  {
+    this.deterministicMapper = deterministicMapper;
+  }
+
+  /**
+   * Creates an ObjectMapper configured for deterministic serialization.
+   * Used for generating consistent fingerprints.
+   */
+  private static ObjectMapper createDeterministicMapper()

Review Comment:
   Maybe make this a static utility method in `TestUtils` so that it may be 
used elsewhere too.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable()
     }
   }
 
+  /**
+   * Creates the compaction states table for storing fingerprinted compaction 
states
+   * <p>
+   * This table stores unique compaction states that are referenced by
+   * segments via fingerprints.
+   */
+  public void createCompactionStatesTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"
+                + "  datasource VARCHAR(255) NOT NULL,\n"

Review Comment:
   Other tables seem to use `dataSource` camel-cased.
   ```suggestion
                   + "  dataSource VARCHAR(255) NOT NULL,\n"
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages compaction state persistence and fingerprint generation.
+ * <p>
+ * Implementations may be backed by a database (like {@link 
PersistedCompactionStateManager}) or
+ * use in-memory storage (like {@link HeapMemoryCompactionStateManager}).
+ */
+public interface CompactionStateManager
+{
+  /**
+   * Generates a deterministic fingerprint for the given compaction state and 
datasource.
+   * The fingerprint is a SHA-256 hash of the datasource name and serialized 
compaction state.
+   *
+   * @param compactionState The compaction configuration to fingerprint
+   * @param dataSource The datasource name
+   * @return A hex-encoded SHA-256 fingerprint string
+   */
+  String generateCompactionStateFingerprint(CompactionState compactionState, 
String dataSource);

Review Comment:
   Might be worth calling out that a fingerprint is unique across the board and 
not just within a single datasource.



##########
server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java:
##########
@@ -84,12 +88,16 @@ public DataSourceCompactibleSegmentIterator(
       DataSourceCompactionConfig config,
       SegmentTimeline timeline,
       List<Interval> skipIntervals,
-      CompactionCandidateSearchPolicy searchPolicy
+      CompactionCandidateSearchPolicy searchPolicy,
+      CompactionStateManager compactionStateManager,
+      CompactionStateCache compactionStateCache

Review Comment:
   Instead of passing these 2 concrete classes, please pass in a new interface 
`CompactionFingerprintMapper` which has only 2 methods:
   - `generateFingerprint(dataSource, state)`
   - `getStateForFingerprint(fingerprint)`
   
   It would simplify the dependencies and testability of this class. We could 
use the same `CompactionFingerprintMapper` interface in `CompactionStatus`, 
`CompactionJobParams`, and `PriorityBasedCompactionSegmentIterator` too.
   
   The concrete impl of `CompactionFingerprintMapper` would simply delegate 
these operations to the respective cache and storage, as applicable.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +388,143 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (compactionStateCache != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      final List<DataSegment> allCompactedSegments = new 
ArrayList<>(this.compactedSegments);
+      allCompactedSegments.addAll(
+          unknownStateToSegments
+              .values()
+              .stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toList())
+      );
 
       if (reasonsForCompaction.isEmpty()) {
         return COMPLETE;
       } else {
         return CompactionStatus.pending(
-            createStats(compactedSegments),
+            createStats(allCompactedSegments),
             createStats(uncompactedSegments),
             reasonsForCompaction.get(0)
         );
       }
     }
 
+    /**
+     * Evaluates the fingerprints of all fingerprinted candidate segments 
against the expected fingerprint.
+     * <p>
+     * If all fingerprinted segments have the expected fingerprint, the check 
can quickly pass as COMPLETE. However,
+     * if any fingerprinted segment has a mismatched fingerprint, we need to 
investigate further by adding them to
+     * {@link #unknownStateToSegments} where their compaction states will be 
analyzed.
+     * </p>
+     */
+    private CompactionStatus 
allFingerprintedCandidatesHaveExpectedFingerprint()
+    {
+      Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new 
HashMap<>();
+      for (DataSegment segment : fingerprintedSegments) {
+        String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null && !fingerprint.equals(targetFingerprint)) {
+          mismatchedFingerprintToSegmentMap
+              .computeIfAbsent(fingerprint, k -> new ArrayList<>())
+              .add(segment);
+        } else if (fingerprint != null && 
fingerprint.equals(targetFingerprint)) {
+          // Segment has correct fingerprint - add to compacted segments
+          compactedSegments.add(segment);
+        }
+      }
+
+      if (mismatchedFingerprintToSegmentMap.isEmpty()) {
+        return COMPLETE;
+      }
+
+      boolean fingerprintedSegmentNeedingCompactionFound = false;
+
+      if (compactionStateCache != null) {
+        for (Map.Entry<String, List<DataSegment>> e : 
mismatchedFingerprintToSegmentMap.entrySet()) {
+          String fingerprint = e.getKey();
+          CompactionState stateToValidate = 
compactionStateCache.getCompactionStateByFingerprint(fingerprint).orElse(null);
+          if (stateToValidate == null) {
+            log.warn("No compaction state found for fingerprint[%s]", 
fingerprint);
+            fingerprintedSegmentNeedingCompactionFound = true;
+            uncompactedSegments.addAll(e.getValue());
+          } else {
+            // Note that this does not mean we need compaction yet - we need 
to validate the state further to determine this
+            unknownStateToSegments.compute(
+                stateToValidate,
+                (state, segments) -> {
+                  if (segments == null) {
+                    segments = new ArrayList<>();
+                  }
+                  segments.addAll(e.getValue());
+                  return segments;
+                }
+            );
+          }
+        }
+      } else {
+        for (Map.Entry<String, List<DataSegment>> e : 
mismatchedFingerprintToSegmentMap.entrySet()) {
+          uncompactedSegments.addAll(e.getValue());
+          fingerprintedSegmentNeedingCompactionFound = true;
+        }
+      }
+
+      if (fingerprintedSegmentNeedingCompactionFound) {
+        return CompactionStatus.pending("At least one segment has a mismatched 
fingerprint and needs compaction");
+      } else {
+        return COMPLETE;
+      }
+    }
+
+    /**
+     * Divvys up segments by certain characteristics and determines if any 
segments have never been compacted.
+     * <p>
+     * Segments are categorized into three groups:
+     * <ul>
+     *   <li>fingerprinted - segments who have a compaction state fingerprint 
and need more investigation before adding to {@link 
#unknownStateToSegments}</li>
+     *   <li>non-fingerprinted with a lastCompactionState - segments who have 
no fingerprint but have stored a lastCompactionState that needs to be 
analyzed</li>
+     *   <li>uncompacted - segments who have neither a fingerprint nor a 
lastCompactionState and thus definitely need compaction</li>
+     * </ul>
+     * </p>
+     */
     private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
     {
-      // Identify the compaction states of all the segments
       for (DataSegment segment : candidateSegments.getSegments()) {
-        final CompactionState segmentState = segment.getLastCompactionState();
-        if (segmentState == null) {
-          uncompactedSegments.add(segment);
+        final String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null) {
+          fingerprintedSegments.add(segment);
         } else {
-          unknownStateToSegments.computeIfAbsent(segmentState, s -> new 
ArrayList<>()).add(segment);
+          final CompactionState segmentState = 
segment.getLastCompactionState();
+          if (segmentState == null) {
+            uncompactedSegments.add(segment);
+          } else {
+            unknownStateToSegments.computeIfAbsent(segmentState, k -> new 
ArrayList<>()).add(segment);
+          }

Review Comment:
   Nit: Please chain the if-else for better readability and a minimal diff.
   
   ```java
   final String fingerprint = segment.getCompactionStateFingerprint();
   final CompactionState segmentState = segment.getLastCompactionState();
   if (fingerprint != null) {
      // add to fingerprinted
   } else if (segmentState == null) {
       // add to uncompacted
   } else {
       // add to unknown state
   }
   ```



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +388,143 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (compactionStateCache != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      final List<DataSegment> allCompactedSegments = new 
ArrayList<>(this.compactedSegments);
+      allCompactedSegments.addAll(

Review Comment:
   Nit: It might be cognitively simpler to just add the unknown state segments 
directly to `this.compactedSegments` instead of using a local variable 
`allCompactedSegments`.



##########
server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java:
##########
@@ -45,21 +45,34 @@ public class ClusterCompactionConfig
   private final boolean useSupervisors;
   private final CompactionEngine engine;
   private final CompactionCandidateSearchPolicy compactionPolicy;
+  /**
+   * Whether to persist last compaction state directly in segments for 
backwards compatibility.

Review Comment:
   Please move this javadoc to the getter instead.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -674,4 +814,59 @@ private static CompactionStatistics 
createStats(List<DataSegment> segments)
       return CompactionStatistics.create(totalBytes, segments.size(), 
segmentIntervals.size());
     }
   }
+
+  /**
+   * Given a {@link DataSourceCompactionConfig}, create a {@link 
CompactionState}
+   */
+  public static CompactionState 
createCompactionStateFromConfig(DataSourceCompactionConfig config)

Review Comment:
   Should we move this method to `DataSourceCompactionConfig` interface itself?
   
   ```java
   default CompactionState toCompactionState()
   {
      ...
   }
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java:
##########
@@ -45,21 +45,34 @@ public class ClusterCompactionConfig
   private final boolean useSupervisors;
   private final CompactionEngine engine;
   private final CompactionCandidateSearchPolicy compactionPolicy;
+  /**
+   * Whether to persist last compaction state directly in segments for 
backwards compatibility.
+   * <p>
+   * In a future release this option will be removed and last compaction state 
will no longer be persisted in segments.
+   * Instead, it will only be stored in the metadata store with a fingerprint 
id that segments will reference. Some
+   * operators may want to disable this behavior early to begin saving space 
in segment metadatastore table entries.
+   */
+  private final boolean legacyPersistLastCompactionStateInSegments;
 
   @JsonCreator
   public ClusterCompactionConfig(
       @JsonProperty("compactionTaskSlotRatio") @Nullable Double 
compactionTaskSlotRatio,
       @JsonProperty("maxCompactionTaskSlots") @Nullable Integer 
maxCompactionTaskSlots,
       @JsonProperty("compactionPolicy") @Nullable 
CompactionCandidateSearchPolicy compactionPolicy,
       @JsonProperty("useSupervisors") @Nullable Boolean useSupervisors,
-      @JsonProperty("engine") @Nullable CompactionEngine engine
+      @JsonProperty("engine") @Nullable CompactionEngine engine,
+      @JsonProperty("legacyPersistLastCompactionStateInSegments") Boolean 
legacyPersistLastCompactionStateInSegments

Review Comment:
   Can we call this field `storeCompactionStatePerSegment` instead?
   Then it would align with the task context parameter, `storeCompactionState`.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1699,6 +1700,12 @@ private void handleQueryResults(
               Tasks.DEFAULT_STORE_COMPACTION_STATE
           );
 
+      final String compactionStateFingerprint = querySpec.getContext()
+          .getString(
+              Tasks.COMPACTION_STATE_FINGERPRINT_KEY,
+              null
+          );

Review Comment:
   Can this be simplified?
   ```suggestion
             .getString(Tasks.COMPACTION_STATE_FINGERPRINT_KEY);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java:
##########
@@ -31,27 +32,37 @@ public class CompactionJob extends BatchIndexingJob
 {
   private final CompactionCandidate candidate;
   private final int maxRequiredTaskSlots;
+  private final String compactionStateFingerprint;
+  private final CompactionState compactionState;

Review Comment:
   Should we rename these to `targetCompactionState` and 
`targetCompactionStateFingerprint`?



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +388,143 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (compactionStateCache != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      final List<DataSegment> allCompactedSegments = new 
ArrayList<>(this.compactedSegments);
+      allCompactedSegments.addAll(
+          unknownStateToSegments
+              .values()
+              .stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toList())
+      );
 
       if (reasonsForCompaction.isEmpty()) {
         return COMPLETE;
       } else {
         return CompactionStatus.pending(
-            createStats(compactedSegments),
+            createStats(allCompactedSegments),
             createStats(uncompactedSegments),
             reasonsForCompaction.get(0)
         );
       }
     }
 
+    /**
+     * Evaluates the fingerprints of all fingerprinted candidate segments 
against the expected fingerprint.
+     * <p>
+     * If all fingerprinted segments have the expected fingerprint, the check 
can quickly pass as COMPLETE. However,
+     * if any fingerprinted segment has a mismatched fingerprint, we need to 
investigate further by adding them to
+     * {@link #unknownStateToSegments} where their compaction states will be 
analyzed.
+     * </p>
+     */
+    private CompactionStatus 
allFingerprintedCandidatesHaveExpectedFingerprint()
+    {
+      Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new 
HashMap<>();
+      for (DataSegment segment : fingerprintedSegments) {
+        String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null && !fingerprint.equals(targetFingerprint)) {
+          mismatchedFingerprintToSegmentMap
+              .computeIfAbsent(fingerprint, k -> new ArrayList<>())
+              .add(segment);
+        } else if (fingerprint != null && 
fingerprint.equals(targetFingerprint)) {
+          // Segment has correct fingerprint - add to compacted segments
+          compactedSegments.add(segment);
+        }
+      }
+
+      if (mismatchedFingerprintToSegmentMap.isEmpty()) {
+        return COMPLETE;
+      }
+
+      boolean fingerprintedSegmentNeedingCompactionFound = false;
+
+      if (compactionStateCache != null) {
+        for (Map.Entry<String, List<DataSegment>> e : 
mismatchedFingerprintToSegmentMap.entrySet()) {
+          String fingerprint = e.getKey();
+          CompactionState stateToValidate = 
compactionStateCache.getCompactionStateByFingerprint(fingerprint).orElse(null);
+          if (stateToValidate == null) {
+            log.warn("No compaction state found for fingerprint[%s]", 
fingerprint);
+            fingerprintedSegmentNeedingCompactionFound = true;
+            uncompactedSegments.addAll(e.getValue());
+          } else {
+            // Note that this does not mean we need compaction yet - we need 
to validate the state further to determine this
+            unknownStateToSegments.compute(
+                stateToValidate,
+                (state, segments) -> {
+                  if (segments == null) {
+                    segments = new ArrayList<>();
+                  }
+                  segments.addAll(e.getValue());
+                  return segments;
+                }
+            );
+          }
+        }
+      } else {
+        for (Map.Entry<String, List<DataSegment>> e : 
mismatchedFingerprintToSegmentMap.entrySet()) {
+          uncompactedSegments.addAll(e.getValue());
+          fingerprintedSegmentNeedingCompactionFound = true;
+        }
+      }
+
+      if (fingerprintedSegmentNeedingCompactionFound) {
+        return CompactionStatus.pending("At least one segment has a mismatched 
fingerprint and needs compaction");
+      } else {
+        return COMPLETE;
+      }
+    }
+
+    /**
+     * Divvys up segments by certain characteristics and determines if any 
segments have never been compacted.
+     * <p>
+     * Segments are categorized into three groups:
+     * <ul>
+     *   <li>fingerprinted - segments who have a compaction state fingerprint 
and need more investigation before adding to {@link 
#unknownStateToSegments}</li>
+     *   <li>non-fingerprinted with a lastCompactionState - segments who have 
no fingerprint but have stored a lastCompactionState that needs to be 
analyzed</li>
+     *   <li>uncompacted - segments who have neither a fingerprint nor a 
lastCompactionState and thus definitely need compaction</li>
+     * </ul>
+     * </p>

Review Comment:
   Maybe we can shorten this a little, since it is only a private method and 
the categories are self-explanatory.
   
   ```suggestion
        * Checks if all the segments have been compacted at least once and 
groups them into
        * uncompacted, fingerprinted or non-fingerprinted.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to