kfaraz commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2697110181


##########
server/src/main/java/org/apache/druid/segment/metadata/SqlIndexingStateStorage.java:
##########
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotEmpty;
+import java.util.List;
+
+/**
+ * Database-backed implementation of {@link IndexingStateStorage}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} (AKA 
IndexingState) objects in the metadata storage.
+ * Indexing states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content.
+ * </p>
+ * <p>
+ * This implementation is designed to be called from a single thread and 
relies on
+ * database constraints and the retry mechanism to handle any conflicts. 
Operations are idempotent - concurrent
+ * upserts for the same fingerprint will either succeed or fail with a 
constraint violation that is safely ignored.
+ * </p>
+ */
+@LazySingleton
+public class SqlIndexingStateStorage implements IndexingStateStorage
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SqlIndexingStateStorage.class);
+
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+
+  @Inject
+  public SqlIndexingStateStorage(
+      @Nonnull MetadataStorageTablesConfig dbTables,

Review Comment:
   Nit: Do we need the `@Nonnull` annotations? I don't think we use them in 
injected constructors anyway.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordMetadataCleanupDuty.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import org.apache.druid.indexing.overlord.config.OverlordMetadataCleanupConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.DateTime;
+
+/**
+ * Performs cleanup of stale metadata entries created before a configured 
retain duration.
+ * <p>
+ * In every invocation of {@link #run}, the duty checks if the {@code 
cleanupPeriod}
+ * has elapsed since the {@link #lastCleanupTime}. If it has, then the method
+ * {@link #cleanupEntriesCreatedBeforeDurationToRetain(DateTime)} is invoked. 
Otherwise, the duty
+ * completes immediately without making any changes.
+ */
+public abstract class OverlordMetadataCleanupDuty implements OverlordDuty
+{
+  private static final Logger log = new 
Logger(OverlordMetadataCleanupDuty.class);
+
+  private final String entryType;
+  private final OverlordMetadataCleanupConfig cleanupConfig;
+
+  private DateTime lastCleanupTime = DateTimes.utc(0);
+
+  protected OverlordMetadataCleanupDuty(String entryType, 
OverlordMetadataCleanupConfig cleanupConfig)
+  {
+    this.entryType = entryType;
+    this.cleanupConfig = cleanupConfig;
+
+    if (cleanupConfig.isCleanupEnabled()) {
+      log.debug(
+          "Enabled cleanup of [%s] with period [%s] and durationToRetain 
[%s].",
+          entryType, cleanupConfig.getCleanupPeriod(), 
cleanupConfig.getDurationToRetain()
+      );
+    }
+  }
+
+  @Override
+  public void run()
+  {
+    if (!cleanupConfig.isCleanupEnabled()) {
+      return;
+    }
+
+    final DateTime now = getCurrentTime();
+
+    // Perform cleanup only if cleanup period has elapsed
+    if (lastCleanupTime.plus(cleanupConfig.getCleanupPeriod()).isBefore(now)) {
+      lastCleanupTime = now;
+
+      try {
+        DateTime minCreatedTime = 
now.minus(cleanupConfig.getDurationToRetain());
+        int deletedEntries = 
cleanupEntriesCreatedBeforeDurationToRetain(minCreatedTime);
+        if (deletedEntries > 0) {
+          log.info("Removed [%,d] [%s] created before [%s].", deletedEntries, 
entryType, minCreatedTime);
+        }
+        DateTime pendingMinCreatedTime = 
now.minus(cleanupConfig.getPendingDurationToRetain());
+        int deletedPendingEntries = 
cleanupEntriesCreatedBeforePendingDurationToRetain(pendingMinCreatedTime);
+        if (deletedPendingEntries > 0) {
+          log.info("Removed [%,d] pending entries [%s] created before [%s].", 
deletedPendingEntries, entryType, pendingMinCreatedTime);
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Failed to perform cleanup of [%s]", entryType);
+      }
+    }
+  }
+
+  @Override
+  public boolean isEnabled()
+  {
+    return cleanupConfig.isCleanupEnabled();
+  }
+
+  @Override
+  public DutySchedule getSchedule()
+  {
+    if (isEnabled()) {
+      return new DutySchedule(cleanupConfig.getCleanupPeriod().getMillis(), 0);
+    } else {
+      return new DutySchedule(0, 0);
+    }
+  }
+
+  /**
+   * Cleans up metadata entries created before the {@code minCreatedTime} 
calculated with {@link OverlordMetadataCleanupConfig#durationToRetain}.
+   * <p>
+   * This method is not invoked if the {@code cleanupPeriod} has not elapsed 
since the {@link #lastCleanupTime}.
+   *
+   * @return Number of deleted metadata entries
+   */
+  protected abstract int cleanupEntriesCreatedBeforeDurationToRetain(DateTime 
minCreatedTime);

Review Comment:
   +1



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -229,6 +229,17 @@ protected boolean 
isRootCausePacketTooBigException(Throwable t)
     return false;
   }
 
+  /**
+   *  Checks if the root cause of the given exception is a unique constraint 
violation.
+   *
+   * @return false by default. Specific implementations should override this 
method
+   * to correctly classify their unique constraint violation exceptions.
+   */
+  public boolean isUniqueConstraintViolation(Throwable t)
+  {
+    return false;
+  }

Review Comment:
   I don't think the default impl would be universally applicable anyway.
   So, we either keep this method abstract and force extensions to implement 
it, or let the transaction fail (as is currently being done in the patch).
   
   Also, looking at the way we currently insert segments in 
`IndexerSQLMetadataConnector`, we just check once if a segment ID already 
exists and then skip the insert. Otherwise, we proceed with the insert and let 
the transaction fail if already inserted by a competing transaction. It is up 
to the caller to retry or swallow the exception in such cases.
   
   I think it would be fine to do the same here too to keep things simple for 
the time being.
   
   @capistrant , @clintropolis , thoughts?



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2685,6 +2713,43 @@ public Map<String, Set<String>> 
retrieveUpgradedToSegmentIds(
     return upgradedToSegmentIds;
   }
 
+  /**
+   * Marks indexing state fingerprints as active (non-pending) for 
successfully published segments.
+   * <p>
+   * Extracts unique indexing state fingerprints from the given segments and 
marks them as active
+   * in the inexing state storage. This is called after successful segment 
publishing to indicate
+   * that the indexing state is no longer pending and can be retained with the 
regular grace period.
+   *
+   * @param segments The segments that were successfully published
+   */
+  private void markIndexingStateFingerprintsAsActive(Set<DataSegment> segments)
+  {
+    if (segments == null || segments.isEmpty()) {
+      return;
+    }
+
+    // Collect unique non-null indexing state fingerprints
+    final Set<String> fingerprints = segments.stream()
+                                             
.map(DataSegment::getIndexingStateFingerprint)
+                                             .filter(fp -> fp != null && 
!fp.isEmpty())
+                                             .collect(Collectors.toSet());
+
+    // Mark each fingerprint as active
+    for (String fingerprint : fingerprints) {
+      try {
+        int rowsUpdated = 
indexingStateStorage.markIndexingStatesAsActive(fingerprint);

Review Comment:
   +1, let's batch this if possible



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordMetadataCleanupDuty.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import org.apache.druid.indexing.overlord.config.OverlordMetadataCleanupConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.DateTime;
+
+/**
+ * Performs cleanup of stale metadata entries created before a configured 
retain duration.
+ * <p>
+ * In every invocation of {@link #run}, the duty checks if the {@code 
cleanupPeriod}
+ * has elapsed since the {@link #lastCleanupTime}. If it has, then the method
+ * {@link #cleanupEntriesCreatedBeforeDurationToRetain(DateTime)} is invoked. 
Otherwise, the duty
+ * completes immediately without making any changes.
+ */
+public abstract class OverlordMetadataCleanupDuty implements OverlordDuty
+{
+  private static final Logger log = new 
Logger(OverlordMetadataCleanupDuty.class);
+
+  private final String entryType;
+  private final OverlordMetadataCleanupConfig cleanupConfig;
+
+  private DateTime lastCleanupTime = DateTimes.utc(0);
+
+  protected OverlordMetadataCleanupDuty(String entryType, 
OverlordMetadataCleanupConfig cleanupConfig)
+  {
+    this.entryType = entryType;
+    this.cleanupConfig = cleanupConfig;
+
+    if (cleanupConfig.isCleanupEnabled()) {
+      log.debug(
+          "Enabled cleanup of [%s] with period [%s] and durationToRetain 
[%s].",
+          entryType, cleanupConfig.getCleanupPeriod(), 
cleanupConfig.getDurationToRetain()
+      );
+    }
+  }
+
+  @Override
+  public void run()
+  {
+    if (!cleanupConfig.isCleanupEnabled()) {
+      return;
+    }
+
+    final DateTime now = getCurrentTime();
+
+    // Perform cleanup only if cleanup period has elapsed
+    if (lastCleanupTime.plus(cleanupConfig.getCleanupPeriod()).isBefore(now)) {
+      lastCleanupTime = now;
+
+      try {
+        DateTime minCreatedTime = 
now.minus(cleanupConfig.getDurationToRetain());
+        int deletedEntries = 
cleanupEntriesCreatedBeforeDurationToRetain(minCreatedTime);
+        if (deletedEntries > 0) {
+          log.info("Removed [%,d] [%s] created before [%s].", deletedEntries, 
entryType, minCreatedTime);
+        }
+        DateTime pendingMinCreatedTime = 
now.minus(cleanupConfig.getPendingDurationToRetain());
+        int deletedPendingEntries = 
cleanupEntriesCreatedBeforePendingDurationToRetain(pendingMinCreatedTime);
+        if (deletedPendingEntries > 0) {
+          log.info("Removed [%,d] pending entries [%s] created before [%s].", 
deletedPendingEntries, entryType, pendingMinCreatedTime);
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Failed to perform cleanup of [%s]", entryType);
+      }
+    }
+  }
+
+  @Override
+  public boolean isEnabled()
+  {
+    return cleanupConfig.isCleanupEnabled();
+  }
+
+  @Override
+  public DutySchedule getSchedule()
+  {
+    if (isEnabled()) {
+      return new DutySchedule(cleanupConfig.getCleanupPeriod().getMillis(), 0);
+    } else {
+      return new DutySchedule(0, 0);
+    }
+  }
+
+  /**
+   * Cleans up metadata entries created before the {@code minCreatedTime} 
calculated with {@link OverlordMetadataCleanupConfig#durationToRetain}.
+   * <p>
+   * This method is not invoked if the {@code cleanupPeriod} has not elapsed 
since the {@link #lastCleanupTime}.
+   *
+   * @return Number of deleted metadata entries
+   */
+  protected abstract int cleanupEntriesCreatedBeforeDurationToRetain(DateTime 
minCreatedTime);
+
+  /**
+   * Cleans up pending metadata entries created before the {@code 
minCreatedTime} calculated with {@link 
OverlordMetadataCleanupConfig#pendingDurationToRetain}.
+   * <p>
+   * This method is not invoked if the {@code cleanupPeriod} has not elapsed 
since the {@link #lastCleanupTime}.
+   *
+   * @return Number of deleted pending metadata entries
+   */
+  protected abstract int 
cleanupEntriesCreatedBeforePendingDurationToRetain(DateTime minCreatedTime);

Review Comment:
   +1



##########
server/src/main/java/org/apache/druid/segment/metadata/SqlIndexingStateStorage.java:
##########
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotEmpty;
+import java.util.List;
+
+/**
+ * Database-backed implementation of {@link IndexingStateStorage}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} (AKA 
IndexingState) objects in the metadata storage.
+ * Indexing states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content.
+ * </p>
+ * <p>
+ * This implementation is designed to be called from a single thread and 
relies on
+ * database constraints and the retry mechanism to handle any conflicts. 
Operations are idempotent - concurrent
+ * upserts for the same fingerprint will either succeed or fail with a 
constraint violation that is safely ignored.
+ * </p>
+ */
+@LazySingleton
+public class SqlIndexingStateStorage implements IndexingStateStorage
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SqlIndexingStateStorage.class);
+
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+
+  @Inject
+  public SqlIndexingStateStorage(
+      @Nonnull MetadataStorageTablesConfig dbTables,
+      @Nonnull ObjectMapper jsonMapper,

Review Comment:
   ```suggestion
         @Nonnull @Json ObjectMapper jsonMapper,
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/OverlordMetadataCleanupConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.joda.time.Duration;
+
+import java.util.Objects;
+
+public class OverlordMetadataCleanupConfig

Review Comment:
   +1, @capistrant , when we migrate other cleanup duties to the Overlord, we 
will just use the `MetadataCleanupConfig` itself.
   For the current purposes, let's add a new class `IndexingStateCleanupConfig` 
which extends `MetadataCleanupConfig`.
   
   Would that work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to