kfaraz commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2623027988


##########
processing/src/main/java/org/apache/druid/timeline/DataSegment.java:
##########
@@ -245,6 +256,9 @@ public DataSegment(
     this.binaryVersion = binaryVersion;
     Preconditions.checkArgument(size >= 0);
     this.size = size;
+    this.compactionStateFingerprint = compactionStateFingerprint != null
+                                      ? 
STRING_INTERNER.intern(compactionStateFingerprint)
+                                      : null;

Review Comment:
   ```suggestion
       this.compactionStateFingerprint = Configs.valueOrDefault(
           compactionStateFingerprint,
           STRING_INTERNER.intern(compactionStateFingerprint)
       );
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.
+ */
+public class HeapMemoryCompactionStateManager extends CompactionStateManager

Review Comment:
   Might be cleaner to let `CompactionStateManager` be an interface, and let 
both the heap-based and the concrete class implement it.



##########
docs/configuration/index.md:
##########
@@ -810,6 +811,8 @@ These Coordinator static configurations can be defined in 
the `coordinator/runti
 |`druid.manager.rules.pollDuration`|The duration between polls the Coordinator 
does for updates to the set of active rules. Generally defines the amount of 
lag time it can take for the Coordinator to notice rules.|`PT1M`|
 |`druid.manager.rules.defaultRule`|The default rule for the cluster|`_default`|
 |`druid.manager.rules.alertThreshold`|The duration after a failed poll upon 
which an alert should be emitted.|`PT10M`|
+|`druid.manager.compactionState.cacheSize`|The maximum number of compaction 
state fingerprints to cache in memory on the coordinator and overlord. 
Compaction state fingerprints are used to track the compaction configuration 
applied to segments. Consider increasing this value if you have a large number 
of datasources with compaction configurations.|`100`|
+|`druid.manager.compactionState.prewarmSize`|The number of most recently used 
compaction state fingerprints to load into cache on Coordinator startup. This 
pre-warms the cache to improve performance immediately after startup.|`100`|

Review Comment:
   Both Coordinator and Overlord (with segment metadata caching enabled) 
already keep all used segments in memory, including the respective (interned) 
`CompactionState` objects as well.
   I don't think the number of distinct `CompactState` objects that we keep in 
memory will increase after this patch.
   
   Do we still need to worry about the cache size of these objects?
   Does a cache miss trigger a fetch from metadata store?



##########
processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java:
##########
@@ -95,4 +95,9 @@ default void exportTable(
    * SegmentSchema table is created only when CentralizedDatasourceSchema 
feature is enabled.
    */
   void createSegmentSchemasTable();
+
+  /**
+   *

Review Comment:
   Doc missing?



##########
processing/src/main/java/org/apache/druid/timeline/DataSegment.java:
##########
@@ -339,6 +353,14 @@ public boolean isTombstone()
     return getShardSpec().getType().equals(ShardSpec.Type.TOMBSTONE);
   }
 
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getCompactionStateFingerprint()

Review Comment:
   Does a return value of null signify "unknown" compaction state or "never 
compacted"? Please add a 1-line javadoc to clarify this point.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -165,11 +167,170 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
             )
             .build();
 
-    runCompactionWithSpec(compactionConfig);
+    runCompactionWithSpec(monthGranConfig);
     waitForAllCompactionTasksToFinish();
 
     Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
     Assertions.assertEquals(1, getNumSegmentsWith(Granularities.MONTH));
+
+    verifyCompactedSegmentsHaveFingerprints(monthGranConfig);
+
+    InlineSchemaDataSourceCompactionConfig yearGranConfig =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withGranularitySpec(
+                new UserCompactionTaskGranularityConfig(Granularities.YEAR, 
null, null)
+            )
+            .withTuningConfig(
+                new UserCompactionTaskQueryTuningConfig(
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    new DimensionRangePartitionsSpec(null, 5000, 
List.of("item"), false),
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null
+                )
+            )
+            .build();
+
+    overlord.latchableEmitter().flush(); // flush events so wait for works 
correctly on the next round of compaction
+    runCompactionWithSpec(yearGranConfig);
+    waitForAllCompactionTasksToFinish();
+
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.MONTH));
+    Assertions.assertEquals(1, getNumSegmentsWith(Granularities.YEAR));
+
+    verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
+  }
+
+  @MethodSource("getEngine")
+  @ParameterizedTest(name = "compactionEngine={0}")
+  public void 
test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine
 compactionEngine)
+      throws InterruptedException
+  {
+    // Configure cluster with persistLastCompactionState=false
+    final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
+        o -> o.updateClusterCompactionConfig(
+            new ClusterCompactionConfig(1.0, 100, null, true, 
compactionEngine, false)
+        )
+    );
+    Assertions.assertTrue(updateResponse.isSuccess());
+
+    // Ingest data at DAY granularity
+    runIngestionAtGranularity(
+        "DAY",
+        "2025-06-01T00:00:00.000Z,shirt,105\n"
+        + "2025-06-02T00:00:00.000Z,trousers,210"
+    );
+    Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
+
+    // Create compaction config to compact to MONTH granularity
+    InlineSchemaDataSourceCompactionConfig monthConfig =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withGranularitySpec(
+                new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null, null)
+            )
+            .withTuningConfig(
+                new UserCompactionTaskQueryTuningConfig(
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    new DimensionRangePartitionsSpec(1000, null, 
List.of("item"), false),
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null
+                )
+            )
+            .build();
+
+    runCompactionWithSpec(monthConfig);
+    waitForAllCompactionTasksToFinish();
+
+    verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint();
+  }
+
+  private void verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint()
+  {
+    overlord
+        .bindings()
+        .segmentsMetadataStorage()
+        .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+        .forEach(segment -> {
+          Assertions.assertNull(
+              segment.getLastCompactionState(),
+              "Segment " + segment.getId() + " should have null 
lastCompactionState"
+          );
+          Assertions.assertNotNull(
+              segment.getCompactionStateFingerprint(),
+              "Segment " + segment.getId() + " should have non-null 
compactionStateFingerprint"
+          );
+        });
+  }
+
+  private void 
verifyCompactedSegmentsHaveFingerprints(DataSourceCompactionConfig 
compactionConfig)
+  {
+    String expectedFingerprint = 
CompactionState.generateCompactionStateFingerprint(
+        CompactSegments.createCompactionStateFromConfig(compactionConfig),
+        dataSource
+    );
+
+    overlord
+        .bindings()
+        .segmentsMetadataStorage()
+        .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+        .forEach(segment -> {
+          String fingerprint = segment.getCompactionStateFingerprint();
+          Assertions.assertNotNull(
+              fingerprint,
+              "Segment " + segment.getId() + " should have a compaction state 
fingerprint"
+          );
+          Assertions.assertFalse(
+              fingerprint.isEmpty(),
+              "Segment " + segment.getId() + " fingerprint should not be empty"
+          );
+          // SHA-256 fingerprints should be 64 hex characters
+          Assertions.assertEquals(
+              64,
+              fingerprint.length(),
+              "Segment " + segment.getId() + " fingerprint should be 64 
characters (SHA-256)"
+          );

Review Comment:
   These 3 assertions can be omitted, just the last one should suffice.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1696,6 +1696,12 @@ private void handleQueryResults(
               Tasks.DEFAULT_STORE_COMPACTION_STATE
           );
 
+      String compactionStateFingerprint = querySpec.getContext()

Review Comment:
   ```suggestion
         final String compactionStateFingerprint = querySpec.getContext()
   ```



##########
website/.spelling:
##########
@@ -483,6 +483,7 @@ pre-computation
 pre-compute
 pre-computed
 pre-computing
+pre-dates

Review Comment:
   `predates` need not be hyphenated.



##########
server/src/main/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.

Review Comment:
   If this is used only in tests, we should probably put it in the test source 
root `src/test/java`.



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Manages the persistence and retrieval of {@link CompactionState} objects in 
the metadata storage.
+ * <p>
+ * Compaction states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content. A cache
+ * of compaction states using the fingerprints as keys is maintained in memory 
to optimize retrieval performance.
+ * </p>
+ * <p>
+ * A striped locking mechanism is used to ensure thread-safe persistence of 
compaction states on a per-datasource basis.
+ * </p>
+ */
+@ManageLifecycle
+public class CompactionStateManager

Review Comment:
   I don't feel that pre-warming the cache is really necessary. The fingerprint 
needs to be retrieved only when running the `CompactionJobQueue` on Overlord or 
`CompactSegments` on Coordinator.
   
   1. Let's always keep all the compaction states in memory. We are already 
keeping all the used segments in memory. The distinct `CompactionState` objects 
will be fairly small in number and size.
   2. The states can be cached in `HeapMemorySegmentMetadataCache` which 
already serves as a cache for used segments, pending segments and schemas.
   3. If possible, let's support the fingerprint flow only with compaction 
supervisors and not the Coordinator-based `CompactSegments` duty. That would 
simplify the new flow and be another motivation for users to migrate to using 
compaction supervisors.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -132,7 +134,7 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
     Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
 
     // Create a compaction config with MONTH granularity
-    InlineSchemaDataSourceCompactionConfig compactionConfig =
+    InlineSchemaDataSourceCompactionConfig monthGranConfig =

Review Comment:
   ```suggestion
       InlineSchemaDataSourceCompactionConfig monthGranularityConfig =
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java:
##########
@@ -70,12 +73,34 @@ public List<CompactionJob> createCompactionJobs(
 
     final List<CompactionJob> jobs = new ArrayList<>();
 
+    CompactionState compactionState = 
CompactSegments.createCompactionStateFromConfig(config);
+
+    String compactionStateFingerprint = 
CompactionState.generateCompactionStateFingerprint(
+        compactionState,
+        config.getDataSource()
+    );
+
+    if (segmentIterator.hasNext()) {
+      // If we are going to create compaction jobs for this compaction state, 
we need to persist the fingerprint -> state
+      // mapping so compacted segments from these jobs can reference a valid 
compaction state.
+      params.getCompactionStateManager().persistCompactionState(

Review Comment:
   The templates should only perform lightweight (i.e. non-IO) read-only 
operations as `createCompactionJobs` may be called frequently.
   We should not do any persistence here.
   Instead, the `params` can hold some mapping where we can add this compaction 
state and perform persistence on-demand (perhaps in the `CompactionJobQueue`).



##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -50,6 +59,26 @@
  */
 public class CompactionState
 {
+
+  /**
+   * Lazy initialization holder for deterministic ObjectMapper.

Review Comment:
   I wonder if we shouldn't just inject this mapper annotated with `@Sorted` or 
`@Deterministic` as a lazy singleton. It may be injected into 
`CompactionStateManager` and fingerprints will always be created by that class 
rather than using a static utility method.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1751,6 +1760,19 @@ private void handleQueryResults(
     }
   }
 
+  private static Function<Set<DataSegment>, Set<DataSegment>> 
addCompactionStateFingerprintToSegments(String compactionStateFingerprint)

Review Comment:
   Let's re-use the static function from `AbstractTask` itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to