capistrant commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2621038516
##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -206,4 +252,22 @@ public static Function<Set<DataSegment>, Set<DataSegment>>
addCompactionStateToS
.map(s -> s.withLastCompactionState(compactionState))
.collect(Collectors.toSet());
}
+
+ /**
+ * Generates a fingerprint string for the given compaction state and data
source using SHA-256 hash algorithm.
+ */
+ @SuppressWarnings("UnstableApiUsage")
+ public static String generateCompactionStateFingerprint(final
CompactionState compactionState, final String dataSource)
Review Comment:
not sure if this is better off in the `FingerprintGenerator` class or not.
that class seems to have a generic name but be specifically for the schema
fingerprinting so I added here instead.
##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
Review Comment:
is an associated test class missing?
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java:
##########
@@ -70,12 +73,32 @@ public List<CompactionJob> createCompactionJobs(
final List<CompactionJob> jobs = new ArrayList<>();
+ CompactionState compactionState =
CompactSegments.createCompactionStateFromConfig(config);
+
+ String compactionStateFingerprint =
CompactionState.generateCompactionStateFingerprint(
+ compactionState,
+ config.getDataSource()
+ );
+
+ if (segmentIterator.hasNext()) {
Review Comment:
comment here may be useful. Clear up that we pre-write the compaction state
we will be creating compaction tasks for so the underlying task does not need
to worry about it, all it has to do is persist the fingerprint when it
generates segments.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -376,15 +416,92 @@ private CompactionStatus evaluate()
}
}
+ /**
+ * Evaluates the fingerprints of all fingerprinted candidate segments
against the expected fingerprint.
+ * <p>
+ * If all fingerprinted segments have the expected fingerprint, the check
can quickly pass as COMPLETE. However,
+ * if any fingerprinted segment has a mismatched fingerprint, we need to
investigate further by adding them to
+ * {@link #unknownStateToSegments} where their compaction states will be
analyzed.
+ * </p>
+ */
+ private CompactionStatus
allFingerprintedCandidatesHaveExpectedFingerprint()
Review Comment:
i think this is missing adding properly fingerprinted segments to compacted
segments which is used for stats?
##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Handles compaction state persistence on the Coordinator.
+ */
+@ManageLifecycle
+public class CompactionStateManager
+{
+ private static final EmittingLogger log = new
EmittingLogger(CompactionStateManager.class);
+ private static final int DB_ACTION_PARTITION_SIZE = 100;
+ private static final int DEFAULT_PREWARM_SIZE = 100;
+
+ private final MetadataStorageTablesConfig dbTables;
+ private final ObjectMapper jsonMapper;
+ private final SQLMetadataConnector connector;
+ private final CompactionStateManagerConfig config;
+ private final Cache<String, CompactionState> fingerprintCache;
+ private final Striped<Lock> datasourceLocks = Striped.lock(128);
+
+ @Inject
+ public CompactionStateManager(
+ @Nonnull MetadataStorageTablesConfig dbTables,
+ @Nonnull ObjectMapper jsonMapper,
+ @Nonnull SQLMetadataConnector connector,
+ @Nonnull CompactionStateManagerConfig config
+ )
+ {
+ this.dbTables = dbTables;
+ this.jsonMapper = jsonMapper;
+ this.connector = connector;
+ this.config = config;
+
+ this.fingerprintCache = CacheBuilder.newBuilder()
+ .maximumSize(config.getCacheSize())
+ .build();
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ // This is defensive. Since the new table is created during startup after
upgrade, we need to defend against
+ // the table not existing yet. If that is the case we do not pre-warm the
cache.
+ try {
+ boolean tableExists = connector.retryWithHandle(
+ handle -> connector.tableExists(handle,
dbTables.getCompactionStatesTable())
+ );
+ if (tableExists) {
+ log.info("Pre-warming compaction state cache");
+ prewarmCache(DEFAULT_PREWARM_SIZE);
Review Comment:
prewarm size should be configurable too if we already have the config class
setup.
##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Handles compaction state persistence on the Coordinator.
Review Comment:
better java doc. this isn't accurate either. if using supervisors, the
overlord manages state
##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Handles compaction state persistence on the Coordinator.
+ */
+@ManageLifecycle
+public class CompactionStateManager
+{
+ private static final EmittingLogger log = new
EmittingLogger(CompactionStateManager.class);
+ private static final int DB_ACTION_PARTITION_SIZE = 100;
+ private static final int DEFAULT_PREWARM_SIZE = 100;
+
+ private final MetadataStorageTablesConfig dbTables;
+ private final ObjectMapper jsonMapper;
+ private final SQLMetadataConnector connector;
+ private final CompactionStateManagerConfig config;
+ private final Cache<String, CompactionState> fingerprintCache;
+ private final Striped<Lock> datasourceLocks = Striped.lock(128);
+
+ @Inject
+ public CompactionStateManager(
+ @Nonnull MetadataStorageTablesConfig dbTables,
+ @Nonnull ObjectMapper jsonMapper,
+ @Nonnull SQLMetadataConnector connector,
+ @Nonnull CompactionStateManagerConfig config
+ )
+ {
+ this.dbTables = dbTables;
+ this.jsonMapper = jsonMapper;
+ this.connector = connector;
+ this.config = config;
+
+ this.fingerprintCache = CacheBuilder.newBuilder()
+ .maximumSize(config.getCacheSize())
+ .build();
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ // This is defensive. Since the new table is created during startup after
upgrade, we need to defend against
+ // the table not existing yet. If that is the case we do not pre-warm the
cache.
+ try {
+ boolean tableExists = connector.retryWithHandle(
+ handle -> connector.tableExists(handle,
dbTables.getCompactionStatesTable())
+ );
+ if (tableExists) {
+ log.info("Pre-warming compaction state cache");
+ prewarmCache(DEFAULT_PREWARM_SIZE);
+ } else {
+ log.info("Compaction states table does not exist, skipping pre-warm");
+ }
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to prewarm cache, will load lazily");
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ fingerprintCache.invalidateAll();
Review Comment:
does this cache object need any other lifecycle cleanup?
##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
Review Comment:
what about if the operator has create tables disabled and does not properly
create the table before upgrading?
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -254,7 +276,27 @@ private int submitCompactionTasks(
snapshotBuilder.addToComplete(entry);
}
- final ClientCompactionTaskQuery taskPayload =
createCompactionTask(entry, config, defaultEngine);
+ CompactionState compactionState =
+ createCompactionStateFromConfig(config);
+
+ String compactionStateFingerprint =
CompactionState.generateCompactionStateFingerprint(
+ compactionState,
+ config.getDataSource()
+ );
+
+ compactionStateManager.persistCompactionState(
Review Comment:
same as earlier. comment here clarifying the pattern of pre-writing
compaction state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]