github-advanced-security[bot] commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2658424595
##########
processing/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java:
##########
@@ -91,6 +95,19 @@
return smileMapper;
}
+ @Provides
+ @LazySingleton
+ @Deterministic
+ public ObjectMapper getSortedMapper(Injector injector)
+ {
+ final ObjectMapper sortedMapper = new DefaultObjectMapper();
+ sortedMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS,
true);
+ sortedMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
Review Comment:
## Deprecated method or constructor invocation
Invoking [ObjectMapper.configure](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10617)
##########
server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not
needed.
+ * Database-specific operations (cleanup, unused marking) are no-ops in this
implementation.
+ */
+public class HeapMemoryCompactionStateManager implements CompactionStateManager
+{
+ private final ConcurrentMap<String, CompactionState> fingerprintToStateMap =
new ConcurrentHashMap<>();
+ private final ObjectMapper deterministicMapper;
+
+ /**
+ * Creates an in-memory compaction state manager with a default
deterministic mapper.
+ * This is a convenience constructor for tests and simulations.
+ */
+ public HeapMemoryCompactionStateManager()
+ {
+ this(createDeterministicMapper());
+ }
+
+ /**
+ * Creates an in-memory compaction state manager with the provided
deterministic mapper
+ * for fingerprint generation.
+ *
+ * @param deterministicMapper ObjectMapper configured for deterministic
serialization
+ */
+ public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper)
+ {
+ this.deterministicMapper = deterministicMapper;
+ }
+
+ /**
+ * Creates an ObjectMapper configured for deterministic serialization.
+ * Used for generating consistent fingerprints.
+ */
+ private static ObjectMapper createDeterministicMapper()
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+ mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
Review Comment:
## Deprecated method or constructor invocation
Invoking [ObjectMapper.configure](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10618)
##########
server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java:
##########
@@ -535,9 +573,284 @@
final DataSegment segment =
DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build();
final CompactionStatus status = CompactionStatus.compute(
CompactionCandidate.from(List.of(segment), null),
- compactionConfig
+ compactionConfig,
+ compactionStateManager,
+ compactionStateCache
+ );
+ Assert.assertFalse(status.isComplete());
+ }
+
+ @Test
+ public void
test_evaluate_needsCompactionWhenAllSegmentsHaveUnexpectedCompactionStateFingerprint()
+ {
+ List<DataSegment> segments = List.of(
+
DataSegment.builder(WIKI_SEGMENT).compactionStateFingerprint("wrongFingerprint").build(),
+
DataSegment.builder(WIKI_SEGMENT_2).compactionStateFingerprint("wrongFingerprint").build()
+ );
+
+ final DataSourceCompactionConfig oldCompactionConfig =
InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(TestDataSource.WIKI)
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+ .build();
+ CompactionState wrongState =
CompactSegments.createCompactionStateFromConfig(oldCompactionConfig);
+
+ final DataSourceCompactionConfig compactionConfig =
InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(TestDataSource.WIKI)
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))
+ .build();
+
+ CompactionState expectedState =
CompactSegments.createCompactionStateFromConfig(compactionConfig);
Review Comment:
## Unread local variable
Variable 'CompactionState expectedState' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10616)
##########
server/src/test/java/org/apache/druid/segment/metadata/PersistedCompactionStateManagerTest.java:
##########
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PersistedCompactionStateManagerTest
+{
+ @RegisterExtension
+ public static final TestDerbyConnector.DerbyConnectorRule5
DERBY_CONNECTOR_RULE =
+ new TestDerbyConnector.DerbyConnectorRule5();
+
+ private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+ private final ObjectMapper deterministicMapper = createDeterministicMapper();
+
+ private static TestDerbyConnector derbyConnector;
+ private static MetadataStorageTablesConfig tablesConfig;
+ private PersistedCompactionStateManager manager;
+
+ @BeforeAll
+ public static void setUpClass()
+ {
+ derbyConnector = DERBY_CONNECTOR_RULE.getConnector();
+ tablesConfig = DERBY_CONNECTOR_RULE.metadataTablesConfigSupplier().get();
+ derbyConnector.createCompactionStatesTable();
+ derbyConnector.createSegmentTable();
+ }
+
+ @BeforeEach
+ public void setUp()
+ {
+ derbyConnector.retryWithHandle(handle -> {
+ handle.createStatement("DELETE FROM " +
tablesConfig.getSegmentsTable()).execute();
+ handle.createStatement("DELETE FROM " +
tablesConfig.getCompactionStatesTable()).execute();
+ return null;
+ });
+
+ manager = new PersistedCompactionStateManager(tablesConfig, jsonMapper,
deterministicMapper, derbyConnector);
+ }
+
+ @Test
+ public void test_persistCompactionState_successfullyInsertsIntoDatabase()
+ {
+ CompactionState state1 = createTestCompactionState();
+ String fingerprint = "fingerprint_abc123";
+
+ Map<String, CompactionState> fingerprintMap = new HashMap<>();
+ fingerprintMap.put(fingerprint, state1);
+
+ derbyConnector.retryWithHandle(handle -> {
+ manager.persistCompactionState(
+ "testDatasource",
+ fingerprintMap,
+ DateTimes.nowUtc()
+ );
+ return null;
+ });
+
+ // Verify the state was inserted into database by checking count
+ Integer count = derbyConnector.retryWithHandle(handle ->
+ handle.createQuery(
+ "SELECT COUNT(*) FROM " + tablesConfig.getCompactionStatesTable()
+ + " WHERE fingerprint = :fp"
+ ).bind("fp", fingerprint)
+ .map((i, r, ctx) -> r.getInt(1))
+ .first()
+ );
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void
test_persistCompactionState_andThen_markUnreferencedCompactionStateAsUnused_andThen_markCompactionStatesAsUsed()
+ {
+ CompactionState state1 = createTestCompactionState();
+ String fingerprint = "fingerprint_abc123";
+
+ Map<String, CompactionState> fingerprintMap = new HashMap<>();
+ fingerprintMap.put(fingerprint, state1);
+
+ derbyConnector.retryWithHandle(handle -> {
+ manager.persistCompactionState(
+ "testDatasource",
+ fingerprintMap,
+ DateTimes.nowUtc()
+ );
+ return null;
+ });
+ assertEquals(1, manager.markUnreferencedCompactionStatesAsUnused());
+ assertEquals(1, manager.markCompactionStatesAsUsed(List.of(fingerprint)));
+ }
+
+ @Test
+ public void test_findReferencedCompactionStateMarkedAsUnused()
+ {
+ CompactionState state1 = createTestCompactionState();
+ String fingerprint = "fingerprint_abc123";
+
+ Map<String, CompactionState> fingerprintMap = new HashMap<>();
+ fingerprintMap.put(fingerprint, state1);
+
+ derbyConnector.retryWithHandle(handle -> {
+ manager.persistCompactionState(
+ "testDatasource",
+ fingerprintMap,
+ DateTimes.nowUtc()
+ );
+ return null;
+ });
+ manager.markUnreferencedCompactionStatesAsUnused();
+ assertEquals(0,
manager.findReferencedCompactionStateMarkedAsUnused().size());
+
+ derbyConnector.retryWithHandle(handle -> {
+ handle.createStatement(
+ "INSERT INTO " + tablesConfig.getSegmentsTable() + " "
+ + "(id, dataSource, created_date, start, \"end\", partitioned,
version, used, payload, "
+ + "used_status_last_updated, compaction_state_fingerprint) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version, :used, :payload, "
+ + ":used_status_last_updated, :compaction_state_fingerprint)"
+ )
+ .bind("id", "testSegment_2024-01-01_2024-01-02_v1_0")
+ .bind("dataSource", "testDatasource")
+ .bind("created_date", DateTimes.nowUtc().toString())
+ .bind("start", "2024-01-01T00:00:00.000Z")
+ .bind("end", "2024-01-02T00:00:00.000Z")
+ .bind("partitioned", 0)
+ .bind("version", "v1")
+ .bind("used", true)
+ .bind("payload", new byte[]{}) // Empty payload is fine for this
test
+ .bind("used_status_last_updated", DateTimes.nowUtc().toString())
+ .bind("compaction_state_fingerprint", fingerprint)
+ .execute();
+ return null;
+ });
+
+ List<String> referenced =
manager.findReferencedCompactionStateMarkedAsUnused();
+ assertEquals(1, referenced.size());
+ assertEquals(fingerprint, referenced.get(0));
+ }
+
+ @Test
+ public void
test_deleteCompactionStatesOlderThan_deletesOnlyOldUnusedStates() throws
Exception
+ {
+ DateTime now = DateTimes.nowUtc();
+ DateTime oldTime = now.minusDays(60);
+ DateTime recentTime = now.minusDays(15);
+ DateTime cutoffTime = now.minusDays(30);
+
+ String oldFingerprint = "old_fp_should_delete";
+ String recentFingerprint = "recent_fp_should_keep";
+
+ CompactionState oldState = createTestCompactionState();
+ CompactionState recentState = createTestCompactionState();
+
+ // Insert old unused state (60 days old)
+ derbyConnector.retryWithHandle(handle -> {
+ handle.createStatement(
+ "INSERT INTO " + tablesConfig.getCompactionStatesTable() + " "
+ + "(created_date, datasource, fingerprint, payload, used,
used_status_last_updated) "
+ + "VALUES (:cd, :ds, :fp, :pl, :used, :updated)"
+ )
+ .bind("cd", oldTime.toString())
+ .bind("ds", "testDatasource")
+ .bind("fp", oldFingerprint)
+ .bind("pl", jsonMapper.writeValueAsBytes(oldState))
+ .bind("used", false)
+ .bind("updated", oldTime.toString())
+ .execute();
+ return null;
+ });
+
+ // Insert recent unused state (15 days old)
+ derbyConnector.retryWithHandle(handle -> {
+ handle.createStatement(
+ "INSERT INTO " + tablesConfig.getCompactionStatesTable() + " "
+ + "(created_date, datasource, fingerprint, payload, used,
used_status_last_updated) "
+ + "VALUES (:cd, :ds, :fp, :pl, :used, :updated)"
+ )
+ .bind("cd", recentTime.toString())
+ .bind("ds", "testDatasource")
+ .bind("fp", recentFingerprint)
+ .bind("pl", jsonMapper.writeValueAsBytes(recentState))
+ .bind("used", false)
+ .bind("updated", recentTime.toString())
+ .execute();
+ return null;
+ });
+
+ // Delete states older than 30 days
+ int deleted =
manager.deleteUnusedCompactionStatesOlderThan(cutoffTime.getMillis());
+ assertEquals(1, deleted);
+
+ // Verify only 1 state remains in the table
+ Integer count = derbyConnector.retryWithHandle(handle ->
+
handle.createQuery("SELECT COUNT(*) FROM " +
tablesConfig.getCompactionStatesTable())
+ .map((i, r, ctx)
-> r.getInt(1))
+ .first()
+ );
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void test_persistCompactionState_withEmptyMap_doesNothing()
+ {
+ // Get initial count
+ Integer beforeCount = derbyConnector.retryWithHandle(handle ->
+
handle.createQuery("SELECT COUNT(*) FROM " +
tablesConfig.getCompactionStatesTable())
+ .map((i, r,
ctx) -> r.getInt(1))
+ .first()
+ );
+
+ // Persist empty map
+ derbyConnector.retryWithHandle(handle -> {
+ manager.persistCompactionState("ds", new HashMap<>(),
DateTimes.nowUtc());
+ return null;
+ });
+
+ // Verify count unchanged
+ Integer afterCount = derbyConnector.retryWithHandle(handle ->
+
handle.createQuery("SELECT COUNT(*) FROM " +
tablesConfig.getCompactionStatesTable())
+ .map((i, r,
ctx) -> r.getInt(1))
+ .first()
+ );
+
+ assertEquals(beforeCount, afterCount);
+ }
+
+ @Test
+ public void
test_persistCompactionState_verifyExistingFingerprintMarkedUsed() throws
Exception
+ {
+ String fingerprint = "existing_fingerprint";
+ CompactionState state = createTestCompactionState();
+
+ // Persist initially
+ derbyConnector.retryWithHandle(handle -> {
+ Map<String, CompactionState> map = new HashMap<>();
+ map.put(fingerprint, state);
+ manager.persistCompactionState("ds1", map, DateTimes.nowUtc());
+ return null;
+ });
+
+ // Verify it's marked as used
+ Boolean usedBefore = derbyConnector.retryWithHandle(handle ->
+ handle.createQuery(
+ "SELECT
used FROM " + tablesConfig.getCompactionStatesTable()
+ + "
WHERE fingerprint = :fp"
+ ).bind("fp",
fingerprint)
+ .map((i, r,
ctx) -> r.getBoolean("used"))
+ .first()
+ );
+ assertTrue(usedBefore);
+
+ // Manually mark it as unused
+ derbyConnector.retryWithHandle(handle ->
+ handle.createStatement(
+ "UPDATE " +
tablesConfig.getCompactionStatesTable()
+ + " SET used = false WHERE
fingerprint = :fp"
+ ).bind("fp", fingerprint).execute()
+ );
+
+ // Persist again with the same fingerprint (should UPDATE, not INSERT)
+ derbyConnector.retryWithHandle(handle -> {
+ Map<String, CompactionState> map = new HashMap<>();
+ map.put(fingerprint, state);
+ manager.persistCompactionState("ds1", map, DateTimes.nowUtc());
+ return null;
+ });
+
+ // Verify it's marked as used again
+ Boolean usedAfter = derbyConnector.retryWithHandle(handle ->
+ handle.createQuery(
+ "SELECT
used FROM " + tablesConfig.getCompactionStatesTable()
+ + " WHERE
fingerprint = :fp"
+ ).bind("fp",
fingerprint)
+ .map((i, r,
ctx) -> r.getBoolean("used"))
+ .first()
+ );
+ assertTrue(usedAfter);
+
+ // Verify only 1 row exists (no duplicate insert)
+ Integer count = derbyConnector.retryWithHandle(handle ->
+
handle.createQuery("SELECT COUNT(*) FROM " +
tablesConfig.getCompactionStatesTable())
+ .map((i, r, ctx)
-> r.getInt(1))
+ .first()
+ );
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void test_markCompactionStateAsUsed_withEmptyList_returnsZero()
+ {
+ assertEquals(0, manager.markCompactionStatesAsUsed(List.of()));
+ }
+
+ // ===== Fingerprint Generation Tests =====
+
+ @Test
+ public void
test_generateCompactionStateFingerprint_deterministicFingerprinting()
+ {
+ CompactionState compactionState1 = createBasicCompactionState();
+ CompactionState compactionState2 = createBasicCompactionState();
+
+ String fingerprint1 =
manager.generateCompactionStateFingerprint(compactionState1, "test-ds");
+ String fingerprint2 =
manager.generateCompactionStateFingerprint(compactionState2, "test-ds");
+
+ assertEquals(
+ fingerprint1,
+ fingerprint2,
+ "Same CompactionState should produce identical fingerprints when
datasource is same"
+ );
+ }
+
+ @Test
+ public void
test_generateCompactionStateFingerprint_differentDatasourcesWithSameState_differentFingerprints()
+ {
+ CompactionState compactionState = createBasicCompactionState();
+
+ String fingerprint1 =
manager.generateCompactionStateFingerprint(compactionState, "ds1");
+ String fingerprint2 =
manager.generateCompactionStateFingerprint(compactionState, "ds2");
+
+ assertNotEquals(
+ fingerprint1,
+ fingerprint2,
+ "Different datasources should produce different fingerprints despite
same state"
+ );
+ }
+
+ @Test
+ public void
test_generateCompactionStateFingerprint_metricsListOrderDifferenceResultsInNewFingerprint()
+ {
+ List<AggregatorFactory> metrics1 = Arrays.asList(
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("sum", "value")
+ );
+
+ List<AggregatorFactory> metrics2 = Arrays.asList(
+ new LongSumAggregatorFactory("sum", "value"),
+ new CountAggregatorFactory("count")
+ );
+
+ CompactionState state1 = new CompactionState(
+ new DynamicPartitionsSpec(null, null),
+ DimensionsSpec.EMPTY,
+ metrics1,
+ null,
+ IndexSpec.getDefault(),
+ null,
+ null
+ );
+
+ CompactionState state2 = new CompactionState(
+ new DynamicPartitionsSpec(null, null),
+ DimensionsSpec.EMPTY,
+ metrics2,
+ null,
+ IndexSpec.getDefault(),
+ null,
+ null
+ );
+
+ String fingerprint1 = manager.generateCompactionStateFingerprint(state1,
"test-ds");
+ String fingerprint2 = manager.generateCompactionStateFingerprint(state2,
"test-ds");
+
+ assertNotEquals(
+ fingerprint1,
+ fingerprint2,
+ "Metrics order currently matters (arrays preserve order in JSON)"
+ );
+ }
+
+ @Test
+ public void
test_generateCompactionStateFingerprint_dimensionsListOrderDifferenceResultsInNewFingerprint()
+ {
+ DimensionsSpec dimensions1 = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2",
"dim3"))
+ );
+
+ DimensionsSpec dimensions2 = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim3", "dim2",
"dim1"))
+ );
+
+ CompactionState state1 = new CompactionState(
+ new DynamicPartitionsSpec(null, null),
+ dimensions1,
+ Collections.singletonList(new CountAggregatorFactory("count")),
+ null,
+ IndexSpec.getDefault(),
+ null,
+ null
+ );
+
+ CompactionState state2 = new CompactionState(
+ new DynamicPartitionsSpec(null, null),
+ dimensions2,
+ Collections.singletonList(new CountAggregatorFactory("count")),
+ null,
+ IndexSpec.getDefault(),
+ null,
+ null
+ );
+
+ String fingerprint1 = manager.generateCompactionStateFingerprint(state1,
"test-ds");
+ String fingerprint2 = manager.generateCompactionStateFingerprint(state2,
"test-ds");
+
+ assertNotEquals(
+ fingerprint1,
+ fingerprint2,
+ "Dimensions order currently matters (arrays preserve order in JSON)"
+ );
+ }
+
+ @Test
+ public void testGenerateCompactionStateFingerprint_differentPartitionsSpec()
+ {
+ CompactionState state1 = new CompactionState(
+ new DynamicPartitionsSpec(5000000, null),
+ DimensionsSpec.EMPTY,
+ Collections.singletonList(new CountAggregatorFactory("count")),
+ null,
+ IndexSpec.getDefault(),
+ null,
+ null
+ );
+
+ CompactionState state2 = new CompactionState(
+ new HashedPartitionsSpec(null, 2, Collections.singletonList("dim1")),
+ DimensionsSpec.EMPTY,
+ Collections.singletonList(new CountAggregatorFactory("count")),
+ null,
+ IndexSpec.getDefault(),
+ null,
+ null
+ );
+
+ String fingerprint1 = manager.generateCompactionStateFingerprint(state1,
"test-ds");
+ String fingerprint2 = manager.generateCompactionStateFingerprint(state2,
"test-ds");
+
+ assertNotEquals(
+ fingerprint1,
+ fingerprint2,
+ "Different PartitionsSpec should produce different fingerprints"
+ );
+ }
+
+ private static ObjectMapper createDeterministicMapper()
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+ mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
Review Comment:
## Deprecated method or constructor invocation
Invoking [ObjectMapper.configure](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10619)
##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedCompactionStateTest.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.metadata.CompactionStateManager;
+import org.apache.druid.segment.metadata.PersistedCompactionStateManager;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.config.MetadataCleanupConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.timeline.CompactionState;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class KillUnreferencedCompactionStateTest
+{
+ @RegisterExtension
+ public static final TestDerbyConnector.DerbyConnectorRule5
DERBY_CONNECTOR_RULE =
+ new TestDerbyConnector.DerbyConnectorRule5();
+
+ private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+ private TestDerbyConnector derbyConnector;
+ private MetadataStorageTablesConfig tablesConfig;
+ private CompactionStateManager compactionStateManager;
+ private DruidCoordinatorRuntimeParams mockParams;
+
+ @BeforeEach
+ public void setUp()
+ {
+ derbyConnector = DERBY_CONNECTOR_RULE.getConnector();
+ tablesConfig = DERBY_CONNECTOR_RULE.metadataTablesConfigSupplier().get();
+
+ derbyConnector.createCompactionStatesTable();
+ derbyConnector.createSegmentTable();
+
+ compactionStateManager = new PersistedCompactionStateManager(tablesConfig,
jsonMapper, createDeterministicMapper(), derbyConnector);
+
+ mockParams = EasyMock.createMock(DruidCoordinatorRuntimeParams.class);
+ CoordinatorRunStats runStats = new CoordinatorRunStats();
+
EasyMock.expect(mockParams.getCoordinatorStats()).andReturn(runStats).anyTimes();
+ EasyMock.replay(mockParams);
+ }
+
+ @Test
+ public void testKillUnreferencedCompactionState_lifecycle()
+ {
+ // Setup time progression: now, +1hr, +7hrs (past cleanup period and
retention)
+ List<DateTime> dateTimes = new ArrayList<>();
+ DateTime now = DateTimes.nowUtc();
+ dateTimes.add(now); // Run 1: Mark as unused
+ dateTimes.add(now.plusMinutes(61)); // Run 2: Still in retention
period
+ dateTimes.add(now.plusMinutes(6 * 60 + 1)); // Run 3: Past retention,
delete
+
+ MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
+ true,
+ Period.parse("PT1H").toStandardDuration(), // cleanup period
+ Period.parse("PT6H").toStandardDuration() // retention duration
+ );
+
+ KillUnreferencedCompactionState duty =
+ new TestKillUnreferencedCompactionState(cleanupConfig,
compactionStateManager, dateTimes);
+
+ // Insert a compaction state (initially marked as used)
+ String fingerprint = "test_fingerprint";
+ CompactionState state = createTestCompactionState();
+
+ derbyConnector.retryWithHandle(handle -> {
+ Map<String, CompactionState> map = new HashMap<>();
+ map.put(fingerprint, state);
+ compactionStateManager.persistCompactionState("test-ds", map,
DateTimes.nowUtc());
+ return null;
+ });
+
+ assertEquals(Boolean.TRUE, getCompactionStateUsedStatus(fingerprint));
+
+ // Run 1: Should mark as unused (no segments reference it)
+ duty.run(mockParams);
+ assertEquals(Boolean.FALSE, getCompactionStateUsedStatus(fingerprint));
+
+ // Run 2: Still unused, but within retention period - should not delete
+ duty.run(mockParams);
+ assertNotNull(getCompactionStateUsedStatus(fingerprint));
+
+ // Run 3: Past retention period - should delete
+ duty.run(mockParams);
+ assertNull(getCompactionStateUsedStatus(fingerprint));
+ }
+
+ @Test
+ public void testKillUnreferencedCompactionState_repair()
+ {
+ List<DateTime> dateTimes = new ArrayList<>();
+ DateTime now = DateTimes.nowUtc();
+ dateTimes.add(now);
+ dateTimes.add(now.plusMinutes(61));
+
+ MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
+ true,
+ Period.parse("PT1H").toStandardDuration(),
+ Period.parse("PT6H").toStandardDuration()
+ );
+
+ KillUnreferencedCompactionState duty =
+ new TestKillUnreferencedCompactionState(cleanupConfig,
compactionStateManager, dateTimes);
+
+ // Insert compaction state
+ String fingerprint = "repair_fingerprint";
+ CompactionState state = createTestCompactionState();
+
+ derbyConnector.retryWithHandle(handle -> {
+ Map<String, CompactionState> map = new HashMap<>();
+ map.put(fingerprint, state);
+ compactionStateManager.persistCompactionState("test-ds", map,
DateTimes.nowUtc());
+ return null;
+ });
+
+ // Run 1: Mark as unused
+ duty.run(mockParams);
+ assertEquals(Boolean.FALSE, getCompactionStateUsedStatus(fingerprint));
+
+ // Now insert a used segment that references this fingerprint
+ derbyConnector.retryWithHandle(handle -> {
+ handle.createStatement(
+ "INSERT INTO " + tablesConfig.getSegmentsTable() + " "
+ + "(id, dataSource, created_date, start, \"end\", partitioned,
version, used, payload, "
+ + "used_status_last_updated, compaction_state_fingerprint) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version, :used, :payload, "
+ + ":used_status_last_updated, :compaction_state_fingerprint)"
+ )
+ .bind("id", "testSegment_2024-01-01_2024-01-02_v1_0")
+ .bind("dataSource", "test-ds")
+ .bind("created_date", DateTimes.nowUtc().toString())
+ .bind("start", "2024-01-01T00:00:00.000Z")
+ .bind("end", "2024-01-02T00:00:00.000Z")
+ .bind("partitioned", 0)
+ .bind("version", "v1")
+ .bind("used", true)
+ .bind("payload", new byte[]{})
+ .bind("used_status_last_updated", DateTimes.nowUtc().toString())
+ .bind("compaction_state_fingerprint", fingerprint)
+ .execute();
+ return null;
+ });
+
+ // Run 2: Repair - should mark it back as used
+ duty.run(mockParams);
+ assertEquals(Boolean.TRUE, getCompactionStateUsedStatus(fingerprint));
+ }
+
+ @Test
+ public void testKillUnreferencedCompactionState_disabled()
+ {
+ MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
+ false, // disabled
+ Period.parse("PT1H").toStandardDuration(),
+ Period.parse("PT6H").toStandardDuration()
+ );
+
+ KillUnreferencedCompactionState duty =
+ new KillUnreferencedCompactionState(cleanupConfig,
compactionStateManager);
+
+ // Insert compaction state
+ String fingerprint = "disabled_fingerprint";
+ derbyConnector.retryWithHandle(handle -> {
+ Map<String, CompactionState> map = new HashMap<>();
+ map.put(fingerprint, createTestCompactionState());
+ compactionStateManager.persistCompactionState("test-ds", map,
DateTimes.nowUtc());
+ return null;
+ });
+
+ // Run duty - should do nothing
+ duty.run(mockParams);
+
+ // Should still be used (not marked as unused)
+ assertEquals(Boolean.TRUE, getCompactionStateUsedStatus(fingerprint));
+ }
+
+ private static class TestKillUnreferencedCompactionState extends
KillUnreferencedCompactionState
+ {
+ private final List<DateTime> dateTimes;
+ private int index = -1;
+
+ public TestKillUnreferencedCompactionState(
+ MetadataCleanupConfig config,
+ CompactionStateManager compactionStateManager,
+ List<DateTime> dateTimes
+ )
+ {
+ super(config, compactionStateManager);
+ this.dateTimes = dateTimes;
+ }
+
+ @Override
+ protected DateTime getCurrentTime()
+ {
+ index++;
+ return dateTimes.get(index);
+ }
+ }
+
+ private CompactionState createTestCompactionState()
+ {
+ return new CompactionState(
+ new DynamicPartitionsSpec(100, null),
+ null, null, null,
+ IndexSpec.getDefault(),
+ null, null
+ );
+ }
+
+ private Boolean getCompactionStateUsedStatus(String fingerprint)
+ {
+ List<Boolean> usedStatus = derbyConnector.retryWithHandle(
+ handle -> handle.createQuery(
+ "SELECT used FROM " +
tablesConfig.getCompactionStatesTable()
+ + " WHERE fingerprint = :fp"
+ )
+ .bind("fp", fingerprint)
+ .mapTo(Boolean.class)
+ .list()
+ );
+
+ return usedStatus.isEmpty() ? null : usedStatus.get(0);
+ }
+
+ private static ObjectMapper createDeterministicMapper()
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+ mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
Review Comment:
## Deprecated method or constructor invocation
Invoking [ObjectMapper.configure](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10620)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]