This is an automated email from the ASF dual-hosted git repository.

swaminathanmanish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 005cac70a46 Enforce mutual exclusion between segment deletion and 
segment lineage (#18518)
005cac70a46 is described below

commit 005cac70a463865722eae944c006318bc3f5cf0b
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed May 20 14:33:59 2026 +0530

    Enforce mutual exclusion between segment deletion and segment lineage 
(#18518)
    
    * Handle delete segment flow along with lineage properly
    
    * Add tests
    
    * Gate retention's lineage-lock filter on the kill switch
    
    The lineage-exclusive-delete kill switch only gated PinotHelixResource
    Manager.deleteSegments. RetentionManager.removeLineageLockedSegments
    unconditionally stripped lineage-locked segments, so disabling the switch
    did not fully restore legacy retention behavior. Read the same config in
    the retention filter; when the switch is off, pass lineage-locked
    segments through to the (now-permissive) delete path.
    
    Also drops two unused static imports from LineageDeleteExclusionTest.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Code review addressal
    
    * Code review addressal
    
    * Test case fixes
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../pinot/common/lineage/SegmentLineageUtils.java  |  29 ++
 .../pinot/common/metrics/ControllerMeter.java      |   4 +-
 .../apache/pinot/controller/ControllerConf.java    |   8 +
 .../api/resources/PinotSegmentRestletResource.java |   4 +-
 .../helix/core/PinotHelixResourceManager.java      |  89 +++-
 .../core/lineage/SegmentsInLineageException.java   |  55 +++
 .../helix/core/retention/RetentionManager.java     |  45 +-
 .../core/lineage/LineageDeleteExclusionTest.java   | 187 ++++++++
 .../LineageDeleteInterleavingIntegrationTest.java  | 509 +++++++++++++++++++++
 9 files changed, 924 insertions(+), 6 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
index 073ff679b59..526397213ea 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.lineage;
 
+import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
@@ -53,4 +54,32 @@ public class SegmentLineageUtils {
       }
     }
   }
+
+  /**
+   * Returns the set of segments that participate in a live lineage entry and 
therefore must not be deleted by
+   * external callers.
+   * <ul>
+   *   <li>{@code IN_PROGRESS} entries lock both {@code segmentsFrom} and 
{@code segmentsTo}.</li>
+   *   <li>{@code COMPLETED} entries lock {@code segmentsFrom}</li>
+   *   <li>{@code REVERTED} entries lock nothing.</li>
+   * </ul>
+   */
+  public static Set<String> getDeleteBlockedSegments(SegmentLineage 
segmentLineage) {
+    Set<String> blocked = new HashSet<>();
+    for (LineageEntry lineageEntry : 
segmentLineage.getLineageEntries().values()) {
+      switch (lineageEntry.getState()) {
+        case IN_PROGRESS:
+          blocked.addAll(lineageEntry.getSegmentsFrom());
+          blocked.addAll(lineageEntry.getSegmentsTo());
+          break;
+        case COMPLETED:
+          blocked.addAll(lineageEntry.getSegmentsFrom());
+          break;
+        case REVERTED:
+        default:
+          break;
+      }
+    }
+    return blocked;
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index d812d1c1059..4ba39d3f87b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -83,7 +83,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   AUDIT_RESPONSE_FAILURES("failures", true),
   AUDIT_REQUEST_PAYLOAD_TRUNCATED("count", true),
   // Upsert compact merge task metrics
-  
UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE("UpsertCompactMergeSegmentsSkipped",
 false);
+  
UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE("UpsertCompactMergeSegmentsSkipped",
 false),
+  // Number of segment-delete requests rejected because the targets 
participate in a live segment lineage entry.
+  LINEAGE_BLOCKED_DELETE_COUNT("LineageBlockedDeleteCount", false);
 
   private final String _brokerMeterName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 6c9fad642b4..93a0cec84d9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -360,6 +360,9 @@ public class ControllerConf extends PinotConfiguration {
   public static final String ENABLE_BATCH_MESSAGE_MODE = 
"controller.enable.batch.message.mode";
   public static final String ENABLE_HYBRID_TABLE_RETENTION_STRATEGY =
       "controller.enable.hybrid.table.retention.strategy";
+  // When true (default), segment deletion refuses to remove segments that 
participate in a live segment lineage
+  // entry (IN_PROGRESS, or COMPLETED.segmentsFrom).
+  public static final String LINEAGE_EXCLUSIVE_DELETE_ENABLED = 
"controller.lineage.exclusive.delete.enabled";
   public static final String DIM_TABLE_MAX_SIZE = 
"controller.dimTable.maxSize";
 
   // Defines the kind of storage and the underlying PinotFS implementation
@@ -395,6 +398,7 @@ public class ControllerConf extends PinotConfiguration {
   public static final boolean 
DEFAULT_RESOURCE_UTILIZATION_CHECKER_COLLECT_USAGE_AT_STARTUP = false;
   public static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
   public static final boolean DEFAULT_ENABLE_HYBRID_TABLE_RETENTION_STRATEGY = 
false;
+  public static final boolean DEFAULT_LINEAGE_EXCLUSIVE_DELETE_ENABLED = true;
   public static final String DEFAULT_CONTROLLER_MODE = 
ControllerMode.DUAL.name();
   public static final String 
DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_STRATEGY =
       AutoRebalanceStrategy.class.getName();
@@ -1141,6 +1145,10 @@ public class ControllerConf extends PinotConfiguration {
     return getProperty(ENABLE_HYBRID_TABLE_RETENTION_STRATEGY, 
DEFAULT_ENABLE_HYBRID_TABLE_RETENTION_STRATEGY);
   }
 
+  public boolean isLineageExclusiveDeleteEnabled() {
+    return getProperty(LINEAGE_EXCLUSIVE_DELETE_ENABLED, 
DEFAULT_LINEAGE_EXCLUSIVE_DELETE_ENABLED);
+  }
+
   public int getSegmentLevelValidationIntervalInSeconds() {
     String period = 
getProperty(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_PERIOD,
         
ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_PERIOD);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index e4e10e928de..2ed6a42587c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -689,8 +689,8 @@ public class PinotSegmentRestletResource {
 
   private void deleteSegmentsInternal(String tableNameWithType, List<String> 
segments,
       @Nullable String retentionPeriod) {
-    PinotResourceManagerResponse response = 
_pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
-        retentionPeriod);
+    PinotResourceManagerResponse response =
+        _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments, 
retentionPeriod);
     if (!response.isSuccessful()) {
       throw new ControllerApplicationException(LOGGER,
           "Failed to delete segments from table: " + tableNameWithType + ", 
error message: " + response.getMessage(),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3aef26fe390..6208eb6262f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -118,6 +118,7 @@ import 
org.apache.pinot.common.messages.TableDeletionMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadataUtils;
+import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
@@ -154,6 +155,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
+import 
org.apache.pinot.controller.helix.core.lineage.SegmentsInLineageException;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
@@ -1073,6 +1075,15 @@ public class PinotHelixResourceManager {
   /**
    * Delete a list of segments from ideal state and remove them from the local 
storage.
    *
+   * <p>Pre-checks the table's segment lineage znode and rejects the whole 
batch with a failure response if any
+   * target segment participates in a live lineage entry (signalled internally 
via {@link SegmentsInLineageException}).
+   * If we don't do this and a segment is deleted while it is being replaced, 
it could lead to the deleted data
+   * resurfacing with the replaced segment. To prevent that, we will block 
segment deletion if its part of the FROM
+   * list in an IN_PROGRESS / COMPLETED Segment Lineage entry.
+   *
+   * <p>Cleanup paths that already coordinated with the lineage lifecycle must 
call
+   * {@link #deleteSegmentsForLineageCleanup} instead.
+   *
    * @param tableNameWithType Table name with type suffix
    * @param segmentNames List of names of segment to be deleted
    * @param retentionPeriod The retention period of the deleted segments.
@@ -1080,6 +1091,23 @@ public class PinotHelixResourceManager {
    */
   public PinotResourceManagerResponse deleteSegments(String tableNameWithType, 
List<String> segmentNames,
       @Nullable String retentionPeriod) {
+    return deleteSegmentsInternal(tableNameWithType, segmentNames, 
retentionPeriod, false);
+  }
+
+  /**
+   * Lineage-aware delete path that skips the cross-check against the live 
lineage entries. Reserved for callers
+   * that have already coordinated with the lineage lifecycle: proactive 
cleanup in {@code startReplaceSegments},
+   * post-revert cleanup in {@code revertReplaceSegments}, and {@code 
RetentionManager}'s lineage-cleanup pass.
+   * External call sites (REST handlers, retention based on table config, 
minion task generators, push-failure
+   * cleanup) must continue to use the public {@link #deleteSegments} 
overloads.
+   */
+  public PinotResourceManagerResponse deleteSegmentsForLineageCleanup(String 
tableNameWithType,
+      List<String> segmentNames) {
+    return deleteSegmentsInternal(tableNameWithType, segmentNames, null, true);
+  }
+
+  private PinotResourceManagerResponse deleteSegmentsInternal(String 
tableNameWithType, List<String> segmentNames,
+      @Nullable String retentionPeriod, boolean bypassLineageCheck) {
     if (segmentNames.isEmpty()) {
       return PinotResourceManagerResponse.success("No segments to delete");
     }
@@ -1087,6 +1115,10 @@ public class PinotHelixResourceManager {
       LOGGER.info("Trying to delete segments: {} from table: {} ", 
segmentNames, tableNameWithType);
       
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
           "Table name: %s is not a valid table name with type suffix", 
tableNameWithType);
+      if (!bypassLineageCheck && isLineageExclusiveDeleteEnabled()) {
+        // Reject the whole batch if any target segment participates in a live 
lineage entry.
+        rejectIfTargetsLineageLockedSegments(tableNameWithType, segmentNames);
+      }
       HelixHelper.removeSegmentsFromIdealState(_helixZkManager, 
tableNameWithType, segmentNames);
       if (retentionPeriod != null) {
         _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
@@ -1096,12 +1128,47 @@ public class PinotHelixResourceManager {
         _segmentDeletionManager.deleteSegments(tableNameWithType, 
segmentNames, tableConfig);
       }
       return PinotResourceManagerResponse.success("Segment " + segmentNames + 
" deleted");
+    } catch (SegmentsInLineageException e) {
+      LOGGER.warn("Refusing to delete segments from table: {}. {}", 
tableNameWithType, e.getMessage());
+      return PinotResourceManagerResponse.failure(e.getMessage());
     } catch (final Exception e) {
       LOGGER.error("Caught exception while deleting segment: {} from table: 
{}", segmentNames, tableNameWithType, e);
       return PinotResourceManagerResponse.failure(e.getMessage());
     }
   }
 
+  /**
+   * Reads the current segment lineage znode (if any) and throws {@link 
SegmentsInLineageException} when the
+   * delete batch intersects the lineage-locked set.
+   */
+  private void rejectIfTargetsLineageLockedSegments(String tableNameWithType, 
List<String> segmentNames) {
+    SegmentLineage segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType);
+    if (segmentLineage == null) {
+      return;
+    }
+    Set<String> blocked = 
SegmentLineageUtils.getDeleteBlockedSegments(segmentLineage);
+    if (blocked.isEmpty()) {
+      return;
+    }
+    List<String> blockingTargets = new ArrayList<>();
+    for (String segment : segmentNames) {
+      if (blocked.contains(segment)) {
+        blockingTargets.add(segment);
+      }
+    }
+    if (blockingTargets.isEmpty()) {
+      return;
+    }
+    if (_controllerMetrics != null) {
+      _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.LINEAGE_BLOCKED_DELETE_COUNT, 1L);
+    }
+    throw new SegmentsInLineageException(tableNameWithType, blockingTargets);
+  }
+
+  private boolean isLineageExclusiveDeleteEnabled() {
+    return _controllerConf == null || 
_controllerConf.isLineageExclusiveDeleteEnabled();
+  }
+
   /**
    * Delete a single segment from ideal state and remove it from the local 
storage.
    *
@@ -4587,7 +4654,7 @@ public class PinotHelixResourceManager {
     // is safe to physically delete segments.
     if (!segmentsToCleanUp.isEmpty()) {
       LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", 
segmentsToCleanUp);
-      deleteSegments(tableNameWithType, segmentsToCleanUp);
+      deleteSegmentsForLineageCleanup(tableNameWithType, segmentsToCleanUp);
     }
 
     // Only successful attempt can reach here
@@ -4640,6 +4707,24 @@ public class PinotHelixResourceManager {
         }
 
         Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType, false));
+        // Re-validate that every segmentsFrom is still present in IdealState.
+        // If a segment was deleted concurrently while it was added in the 
LineageEntry,
+        // the lineage entry is now corrupt and should be reverted.
+        List<String> missingSegmentsFrom = new ArrayList<>();
+        for (String segment : lineageEntry.getSegmentsFrom()) {
+          if (!segmentsForTable.contains(segment)) {
+            missingSegmentsFrom.add(segment);
+          }
+        }
+        if (!missingSegmentsFrom.isEmpty()) {
+          String errorMsg = "Cannot complete segment replacement: segments 
from 'segmentsFrom' no longer exist in "
+              + "IdealState (likely deleted by a concurrent request). Recover 
by calling revertReplaceSegments with "
+              + "forceRevert=true. (tableNameWithType=" + tableNameWithType + 
", segmentLineageEntryId="
+              + segmentLineageEntryId + ", missingSegmentsFrom=" + 
missingSegmentsFrom + ")";
+          LOGGER.error(errorMsg);
+          throw new RuntimeException(errorMsg);
+        }
+
         List<String> segmentsTo = lineageEntry.getSegmentsTo();
         if (endReplaceSegmentsRequest != null && 
!endReplaceSegmentsRequest.getSegmentsTo().isEmpty()) {
           Set<String> segmentsToInSet = new HashSet<>(segmentsTo);
@@ -4823,7 +4908,7 @@ public class PinotHelixResourceManager {
 
           // Invoke the proactive clean-up for segments that we no longer needs
           if (!segmentsTo.isEmpty()) {
-            deleteSegments(tableNameWithType, segmentsTo);
+            deleteSegmentsForLineageCleanup(tableNameWithType, segmentsTo);
           }
           return true;
         } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/SegmentsInLineageException.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/SegmentsInLineageException.java
new file mode 100644
index 00000000000..2a38571175f
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/SegmentsInLineageException.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.Collections;
+import java.util.List;
+
+
+/// Thrown when a segment deletion request targets segments that participate 
in a live segment lineage entry.
+///
+/// A segment is considered lineage-locked when some lineage entry has state 
`IN_PROGRESS` and the segment appears
+/// in either `segmentsFrom` or `segmentsTo`, or has state `COMPLETED` and the 
segment appears in `segmentsFrom`.
+/// Such segments must be cleaned up through the lineage lifecycle (end / 
revert / retention) rather than via the
+/// generic delete path. The whole batch is rejected so callers never observe 
a partial commit.
+public class SegmentsInLineageException extends RuntimeException {
+  private final String _tableNameWithType;
+  private final List<String> _blockingSegments;
+
+  public SegmentsInLineageException(String tableNameWithType, List<String> 
blockingSegments) {
+    super(buildMessage(tableNameWithType, blockingSegments));
+    _tableNameWithType = tableNameWithType;
+    _blockingSegments = Collections.unmodifiableList(blockingSegments);
+  }
+
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  public List<String> getBlockingSegments() {
+    return _blockingSegments;
+  }
+
+  private static String buildMessage(String tableNameWithType, List<String> 
blockingSegments) {
+    return "Cannot delete segments from table: " + tableNameWithType
+        + " because they participate in a live segment lineage entry. Use the 
lineage lifecycle "
+        + "(endReplaceSegments / revertReplaceSegments with forceRevert=true) 
to clean them up. "
+        + "Blocking segments: " + blockingSegments;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index d01aeb1a877..82cd4076128 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -37,6 +37,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.common.lineage.SegmentLineageUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerGauge;
@@ -87,6 +88,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   private volatile boolean _isHybridTableRetentionStrategyEnabled;
   private volatile boolean _useCreationTimeFallbackForRetention;
   private final BrokerServiceHelper _brokerServiceHelper;
+  private final ControllerConf _controllerConf;
 
   public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
@@ -100,6 +102,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     _isHybridTableRetentionStrategyEnabled = 
config.isHybridTableRetentionStrategyEnabled();
     _useCreationTimeFallbackForRetention = 
config.isRetentionCreationTimeFallbackEnabled();
     _brokerServiceHelper = brokerServiceHelper;
+    _controllerConf = config;
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", 
getIntervalInSeconds());
   }
 
@@ -191,6 +194,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
         segmentsToDelete.add(segmentZKMetadata.getSegmentName());
       }
     }
+    removeLineageLockedSegments(offlineTableName, segmentsToDelete);
     if (!segmentsToDelete.isEmpty()) {
       LOGGER.info("Deleting {} segments from table: {}", 
segmentsToDelete.size(), offlineTableName);
       _pinotHelixResourceManager.deleteSegments(offlineTableName, 
segmentsToDelete);
@@ -229,6 +233,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     // Remove last sealed segments such that the table can still create new 
consuming segments if it's paused
     
segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(realtimeTableName));
 
+    removeLineageLockedSegments(realtimeTableName, segmentsToDelete);
     if (!segmentsToDelete.isEmpty()) {
       LOGGER.info("Deleting {} segments from table: {}", 
segmentsToDelete.size(), realtimeTableName);
       _pinotHelixResourceManager.deleteSegments(realtimeTableName, 
segmentsToDelete);
@@ -276,6 +281,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
           segmentsToDelete.add(segmentZKMetadata.getSegmentName());
         }
       }
+      removeLineageLockedSegments(realtimeTableName, segmentsToDelete);
       LOGGER.info("Deleting {} segments from table: {}", 
segmentsToDelete.size(), realtimeTableName);
       if (!segmentsToDelete.isEmpty()) {
         _pinotHelixResourceManager.deleteSegments(realtimeTableName, 
segmentsToDelete);
@@ -451,6 +457,43 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     return segmentName;
   }
 
+  /**
+   * Strips out any segments that participate in a live segment lineage entry 
from the retention-driven
+   * delete batch. Time-based retention must not delete lineage-locked 
segments — they are owned by the
+   * lineage lifecycle and get cleaned up by {@link 
#manageSegmentLineageCleanupForTable} when the lineage
+   * entry becomes eligible. If we left them in, the public delete check would 
reject the whole batch and
+   * the rest of the eligible segments would never get cleaned up.
+   * <p>
+   * Gated by {@link ControllerConf#LINEAGE_EXCLUSIVE_DELETE_ENABLED}: when 
the kill switch is off,
+   * {@code deleteSegments} also stops rejecting lineage-locked targets, so 
retention must mirror legacy
+   * behavior and pass them through to the delete path instead of silently 
dropping them here.
+   */
+  private void removeLineageLockedSegments(String tableNameWithType, 
List<String> segmentsToDelete) {
+    if (segmentsToDelete.isEmpty()) {
+      return;
+    }
+    if (!_controllerConf.isLineageExclusiveDeleteEnabled()) {
+      return;
+    }
+    SegmentLineage segmentLineage =
+        
SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
 tableNameWithType);
+    if (segmentLineage == null) {
+      return;
+    }
+    Set<String> blocked = 
SegmentLineageUtils.getDeleteBlockedSegments(segmentLineage);
+    if (blocked.isEmpty()) {
+      return;
+    }
+    int sizeBefore = segmentsToDelete.size();
+    segmentsToDelete.removeIf(blocked::contains);
+    int removed = sizeBefore - segmentsToDelete.size();
+    if (removed > 0) {
+      LOGGER.info(
+          "Skipping {} segments in retention pass for table: {} because they 
participate in a live lineage entry; "
+              + "they will be cleaned up by the lineage retention path.", 
removed, tableNameWithType);
+    }
+  }
+
   private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) {
     String tableNameWithType = tableConfig.getTableName();
     List<String> segmentsToDelete = new ArrayList<>();
@@ -496,7 +539,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
     // Delete segments based on the segment lineage
     if (!segmentsToDelete.isEmpty()) {
-      _pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentsToDelete);
+      
_pinotHelixResourceManager.deleteSegmentsForLineageCleanup(tableNameWithType, 
segmentsToDelete);
       LOGGER.info("Finished cleaning up segment lineage for table: {} in {}ms, 
deleted segments: {}",
           tableNameWithType, (System.currentTimeMillis() - cleanupStartTime), 
segmentsToDelete);
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteExclusionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteExclusionTest.java
new file mode 100644
index 00000000000..4ff13707709
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteExclusionTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Functional tests for the lineage-aware segment delete check in
+ * {@link PinotHelixResourceManager#deleteSegments(String, List)} and the
+ * {@code segmentsFrom} re-validation in
+ * {@link PinotHelixResourceManager#endReplaceSegments(String, String,
+ * org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest)}.
+ *
+ * <p>Each test uses unique segment names so that the asynchronous 
segment-ZK-metadata cleanup performed by
+ * {@code SegmentDeletionManager} cannot interfere with the next test's 
segment additions.
+ */
+public class LineageDeleteExclusionTest {
+  private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
+  private static final String RAW_TABLE_NAME = "lineageDeleteTable";
+  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+
+  private PinotHelixResourceManager _resourceManager;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TEST_INSTANCE.setupSharedStateAndValidate();
+    _resourceManager = TEST_INSTANCE.getHelixResourceManager();
+    TEST_INSTANCE.addDummySchema(RAW_TABLE_NAME);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
+    _resourceManager.addTable(tableConfig);
+  }
+
+  @BeforeMethod
+  public void clearLineage() {
+    // Drop any lineage znode written by an earlier test so tests do not bleed 
into each other.
+    
SegmentLineageAccessHelper.deleteSegmentLineage(_resourceManager.getPropertyStore(),
 OFFLINE_TABLE_NAME);
+  }
+
+  private void addSegments(String... segmentNames) {
+    for (String name : segmentNames) {
+      _resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
name), "downloadUrl");
+    }
+  }
+
+  private void writeLineageEntry(String entryId, List<String> segmentsFrom, 
List<String> segmentsTo,
+      LineageEntryState state) {
+    SegmentLineage existing =
+        
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
 OFFLINE_TABLE_NAME);
+    SegmentLineage lineage = existing != null ? existing : new 
SegmentLineage(OFFLINE_TABLE_NAME);
+    lineage.addLineageEntry(entryId, new LineageEntry(segmentsFrom, 
segmentsTo, state, System.currentTimeMillis()));
+    
assertTrue(SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
 lineage, -1));
+  }
+
+  @Test
+  public void testDeleteBlockedForInProgressSegmentsFrom() {
+    addSegments("ipfA1", "ipfA2", "ipfB1");
+    writeLineageEntry("e1", Arrays.asList("ipfA1", "ipfA2"), 
Collections.singletonList("ipfB1"),
+        LineageEntryState.IN_PROGRESS);
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList("ipfA1"));
+    assertFalse(response.isSuccessful());
+    assertTrue(response.getMessage().contains("ipfA1"));
+    // IdealState untouched
+    assertTrue(_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
false).contains("ipfA1"));
+  }
+
+  @Test
+  public void testDeleteBlockedForInProgressSegmentsTo() {
+    addSegments("iptA1", "iptB1");
+    writeLineageEntry("e1", Collections.singletonList("iptA1"), 
Collections.singletonList("iptB1"),
+        LineageEntryState.IN_PROGRESS);
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList("iptB1"));
+    assertFalse(response.isSuccessful());
+    assertTrue(_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
false).contains("iptB1"));
+  }
+
+  @Test
+  public void testDeleteBlockedForCompletedSegmentsFrom() {
+    addSegments("cpfA1", "cpfB1");
+    writeLineageEntry("e1", Collections.singletonList("cpfA1"), 
Collections.singletonList("cpfB1"),
+        LineageEntryState.COMPLETED);
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList("cpfA1"));
+    assertFalse(response.isSuccessful());
+    assertTrue(_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
false).contains("cpfA1"));
+  }
+
+  @Test
+  public void testDeleteAllowedForCompletedSegmentsTo() {
+    addSegments("cptA1", "cptB1");
+    writeLineageEntry("e1", Collections.singletonList("cptA1"), 
Collections.singletonList("cptB1"),
+        LineageEntryState.COMPLETED);
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList("cptB1"));
+    assertTrue(response.isSuccessful(), response.getMessage());
+  }
+
+  @Test
+  public void testDeleteAllowedForRevertedEntry() {
+    addSegments("revA1", "revB1");
+    writeLineageEntry("e1", Collections.singletonList("revA1"), 
Collections.singletonList("revB1"),
+        LineageEntryState.REVERTED);
+    // Either segmentsFrom or segmentsTo of a REVERTED entry should be 
unblocked.
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegments(OFFLINE_TABLE_NAME, 
Arrays.asList("revA1", "revB1"));
+    assertTrue(response.isSuccessful(), response.getMessage());
+  }
+
+  @Test
+  public void testBatchRejectionLeavesIdealStateUntouched() {
+    addSegments("batchA1", "batchA2", "batchFree");
+    writeLineageEntry("e1", Collections.singletonList("batchA1"), 
Collections.singletonList("batchA2"),
+        LineageEntryState.IN_PROGRESS);
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegments(OFFLINE_TABLE_NAME, 
Arrays.asList("batchA1", "batchFree"));
+    assertFalse(response.isSuccessful());
+    // Neither "batchA1" nor "batchFree" should have been removed
+    List<String> segments = 
_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false);
+    assertTrue(segments.contains("batchA1"));
+    assertTrue(segments.contains("batchFree"));
+  }
+
+  @Test
+  public void testBypassOverloadAllowsBlockedSegments() {
+    addSegments("byA1", "byB1");
+    writeLineageEntry("e1", Collections.singletonList("byA1"), 
Collections.singletonList("byB1"),
+        LineageEntryState.IN_PROGRESS);
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegmentsForLineageCleanup(OFFLINE_TABLE_NAME, 
Collections.singletonList("byA1"));
+    assertTrue(response.isSuccessful(), response.getMessage());
+  }
+
+  @Test
+  public void testNoLineageZnodeFastPath() {
+    addSegments("nolzA1");
+    PinotResourceManagerResponse response =
+        _resourceManager.deleteSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList("nolzA1"));
+    assertTrue(response.isSuccessful(), response.getMessage());
+  }
+
+  @AfterClass
+  public void tearDown() {
+    TEST_INSTANCE.cleanup();
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
new file mode 100644
index 00000000000..ad922636e12
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.retention.RetentionManager;
+import org.apache.pinot.controller.util.BrokerServiceHelper;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/**
+ * Integration tests for the lineage-aware segment delete check at the REST 
layer and across an interleaved
+ * start/end/revert lifecycle.
+ *
+ * <p>This test:
+ * <ul>
+ *   <li>Drives every step through the REST endpoints.</li>
+ *   <li>Walks a single table through all three lineage states with a delete 
attempt at every step.</li>
+ *   <li>Re-reads the lineage znode after every API call (success or failure) 
to verify failed requests never
+ *       mutate the entry.</li>
+ *   <li>Verifies the retention path silently skips lineage-locked segments 
and that the lineage-cleanup pass
+ *       uses the bypass delete path.</li>
+ * </ul>
+ *
+ * <p>Each test uses unique segment names so the asynchronous {@code 
SegmentDeletionManager} cleanup cannot
+ * interfere with later tests.
+ */
+public class LineageDeleteInterleavingIntegrationTest {
+  private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
+  private static final String RAW_TABLE_NAME = "lineageInterleavingTable";
+  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  // Separate table with retention configured, used by the 
retention-interaction test so it does not affect the
+  // other scenarios that should never trigger time-based deletion.
+  private static final String RETENTION_RAW_TABLE_NAME = 
"lineageInterleavingRetentionTable";
+  private static final String RETENTION_OFFLINE_TABLE_NAME =
+      TableNameBuilder.OFFLINE.tableNameWithType(RETENTION_RAW_TABLE_NAME);
+
+  private PinotHelixResourceManager _resourceManager;
+  private String _controllerBaseUrl;
+  private TestableRetentionManager _retentionManager;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TEST_INSTANCE.setupSharedStateAndValidate();
+    _resourceManager = TEST_INSTANCE.getHelixResourceManager();
+    _controllerBaseUrl = TEST_INSTANCE.getControllerBaseApiUrl();
+
+    TEST_INSTANCE.addDummySchema(RAW_TABLE_NAME);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
+    _resourceManager.addTable(tableConfig);
+
+    TEST_INSTANCE.addDummySchema(RETENTION_RAW_TABLE_NAME);
+    TableConfig retentionTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RETENTION_RAW_TABLE_NAME).setNumReplicas(1)
+            .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").build();
+    _resourceManager.addTable(retentionTableConfig);
+
+    // RetentionManager configured with zero frequencies so we can drive 
processTable() directly and validate the
+    // lineage interaction in isolation.
+    ControllerConf controllerConf = new ControllerConf();
+    controllerConf.setRetentionControllerFrequencyInSeconds(0);
+    controllerConf.setDeletedSegmentsRetentionInDays(0);
+    BrokerServiceHelper brokerServiceHelper = new 
BrokerServiceHelper(_resourceManager, controllerConf, null, null);
+    _retentionManager = new TestableRetentionManager(_resourceManager, 
mock(LeadControllerManager.class),
+        controllerConf, mock(ControllerMetrics.class), brokerServiceHelper);
+  }
+
+  /**
+   * Subclass that exposes the package-protected {@link 
RetentionManager#processTable(String)} so this test
+   * (in a different package) can drive a single retention pass against a 
single table.
+   */
+  private static final class TestableRetentionManager extends RetentionManager 
{
+    TestableRetentionManager(PinotHelixResourceManager 
pinotHelixResourceManager,
+        LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
+        BrokerServiceHelper brokerServiceHelper) {
+      super(pinotHelixResourceManager, leadControllerManager, config, 
controllerMetrics, brokerServiceHelper);
+    }
+
+    void runProcessTable(String tableNameWithType) {
+      processTable(tableNameWithType);
+    }
+  }
+
+  @BeforeMethod
+  public void clearLineage() {
+    
SegmentLineageAccessHelper.deleteSegmentLineage(_resourceManager.getPropertyStore(),
 OFFLINE_TABLE_NAME);
+    
SegmentLineageAccessHelper.deleteSegmentLineage(_resourceManager.getPropertyStore(),
+        RETENTION_OFFLINE_TABLE_NAME);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    TEST_INSTANCE.cleanup();
+  }
+
+  // 
---------------------------------------------------------------------------------------------------------------
+  // T1: full lifecycle interleaved with delete attempts at every step
+  // 
---------------------------------------------------------------------------------------------------------------
+  @Test
+  public void testFullLifecycleInterleavedWithDeleteAttempts()
+      throws IOException {
+    String s1 = "t1_s1";
+    String s2 = "t1_s2";
+    String s3 = "t1_s3";
+    String s1New = "t1_s1_new";
+
+    addSegments(OFFLINE_TABLE_NAME, s1, s2, s3);
+
+    // Step 1: startReplaceSegments → IN_PROGRESS
+    String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Step 2: delete s1 (segmentsFrom of IN_PROGRESS) → 500, IdealState + 
lineage untouched
+    IOException ex500 = expectThrows(IOException.class, () -> 
sendDeleteSegment(OFFLINE_TABLE_NAME, s1));
+    assertStatus(ex500, 500);
+    assertTrue(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Step 3: upload s1New + delete s1New (segmentsTo of IN_PROGRESS) → 500.
+    // the lineage check still rejects without touching the lineage znode.
+    addSegments(OFFLINE_TABLE_NAME, s1New);
+    ex500 = expectThrows(IOException.class, () -> 
sendDeleteSegment(OFFLINE_TABLE_NAME, s1New));
+    assertStatus(ex500, 500);
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Step 4: endReplaceSegments → COMPLETED
+    postEndReplaceSegments(OFFLINE_TABLE_NAME, entryId);
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.COMPLETED, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Step 5: delete s1 (segmentsFrom of COMPLETED) → still 500
+    ex500 = expectThrows(IOException.class, () -> 
sendDeleteSegment(OFFLINE_TABLE_NAME, s1));
+    assertStatus(ex500, 500);
+    assertTrue(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.COMPLETED, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Step 6: delete s1New (segmentsTo of COMPLETED) → 200 (segmentsTo is NOT 
locked once the entry is COMPLETED).
+    // The lineage entry itself must NOT mutate as a side effect of the delete.
+    sendDeleteSegment(OFFLINE_TABLE_NAME, s1New);
+    assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1New));
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.COMPLETED, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Step 7: revertReplaceSegments (with forceRevert so it can revert a 
COMPLETED entry safely) → REVERTED
+    postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId, true);
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.REVERTED, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Step 8: delete s1 (REVERTED locks nothing) → 200
+    sendDeleteSegment(OFFLINE_TABLE_NAME, s1);
+    assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.REVERTED, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Final: IdealState contains exactly s2, s3; lineage has exactly one 
entry in REVERTED state
+    List<String> finalSegments = getSegments(OFFLINE_TABLE_NAME);
+    assertTrue(finalSegments.contains(s2));
+    assertTrue(finalSegments.contains(s3));
+    SegmentLineage lineage =
+        
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
 OFFLINE_TABLE_NAME);
+    assertEquals(lineage.getLineageEntryIds().size(), 1);
+    assertEquals(lineage.getLineageEntry(entryId).getState(), 
LineageEntryState.REVERTED);
+  }
+
+  // 
---------------------------------------------------------------------------------------------------------------
+  // T2: REST-level batch delete rejection atomicity
+  // 
---------------------------------------------------------------------------------------------------------------
+  @Test
+  public void testBatchDeleteRejectionAtomicityOverRest()
+      throws IOException {
+    String s1 = "t2_s1";
+    String s1New = "t2_s1_new";
+    String sFree = "t2_sFree";
+
+    addSegments(OFFLINE_TABLE_NAME, s1, sFree);
+    String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // Batch delete via DELETE 
/segments/{tableName}?type=OFFLINE&segments=s1&segments=sFree → 500
+    String url = _controllerBaseUrl + "/segments/" + RAW_TABLE_NAME + 
"?type=OFFLINE&segments=" + s1
+        + "&segments=" + sFree;
+    IOException ex500 = expectThrows(IOException.class, () -> 
ControllerTest.sendDeleteRequest(url));
+    assertStatus(ex500, 500);
+    // Body should mention the blocking segment, but not the free one (the 
free one is not what's blocking).
+    String body = ex500.getMessage();
+    assertTrue(body.contains(s1), "Expected response to mention blocking 
segment: " + s1 + ", was: " + body);
+    assertFalse(body.contains("Blocking segments: [" + sFree),
+        "Free segment should not appear in blocking list. Was: " + body);
+
+    // Neither s1 nor sFree was removed
+    List<String> segments = getSegments(OFFLINE_TABLE_NAME);
+    assertTrue(segments.contains(s1));
+    assertTrue(segments.contains(sFree));
+
+    // Lineage entry is untouched
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+  }
+
+  // 
---------------------------------------------------------------------------------------------------------------
+  // T3: Recovery via forceRevert when a (bypass) concurrent delete strands 
the lineage entry
+  // 
---------------------------------------------------------------------------------------------------------------
+  @Test
+  public void testRecoveryViaForceRevertAfterCorruptedLineage()
+      throws IOException {
+    String s1 = "t4_s1";
+    String s2 = "t4_s2";
+    addSegments(OFFLINE_TABLE_NAME, s1);
+
+    String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList(s1),
+        Collections.singletonList(s2));
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+
+    // Simulate the lost-race outcome via the bypass overload (the only path 
that can produce this state — the
+    // public REST DELETE would be rejected by the new check).
+    _resourceManager.deleteSegmentsForLineageCleanup(OFFLINE_TABLE_NAME, 
Collections.singletonList(s1));
+    assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+    // Lineage entry must NOT mutate as a side effect of the bypass delete.
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+
+    // Upload s2; lineage still unchanged
+    addSegments(OFFLINE_TABLE_NAME, s2);
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+
+    // endReplaceSegments must fail because s1 (segmentsFrom) is gone.
+    IOException endEx = expectThrows(IOException.class, () -> 
postEndReplaceSegments(OFFLINE_TABLE_NAME, entryId));
+    assertNon2xx(endEx);
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+
+    // Operator recovery via revertReplaceSegments(forceRevert=true) → REVERTED
+    postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId, true);
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.REVERTED, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+
+    // s2 is now unblocked — delete via the public path
+    sendDeleteSegment(OFFLINE_TABLE_NAME, s2);
+    assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s2));
+    // Delete must not touch the lineage entry
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.REVERTED, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+  }
+
+  // 
---------------------------------------------------------------------------------------------------------------
+  // T4b: revertReplaceSegments without forceRevert must NOT revert an 
IN_PROGRESS entry, and a failed revert must
+  // not corrupt the znode either
+  // 
---------------------------------------------------------------------------------------------------------------
+  @Test
+  public void testRevertWithoutForceRevertFailsOnCorruptedLineage()
+      throws IOException {
+    String s1 = "t4b_s1";
+    String s2 = "t4b_s2";
+    addSegments(OFFLINE_TABLE_NAME, s1);
+
+    String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList(s1),
+        Collections.singletonList(s2));
+    _resourceManager.deleteSegmentsForLineageCleanup(OFFLINE_TABLE_NAME, 
Collections.singletonList(s1));
+    addSegments(OFFLINE_TABLE_NAME, s2);
+
+    // Attempting revertReplaceSegments without forceRevert against an 
IN_PROGRESS entry should fail.
+    IOException ex = expectThrows(IOException.class, () -> 
postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId,
+        false));
+    assertNon2xx(ex);
+    // Failed revert must not mutate the znode.
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+
+    // Retry with forceRevert=true → 200
+    postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId, true);
+    assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.REVERTED, Collections.singletonList(s1),
+        Collections.singletonList(s2));
+  }
+
+  // 
---------------------------------------------------------------------------------------------------------------
+  // T5: retention manager skips lineage-locked segments and the 
lineage-cleanup pass uses the bypass path
+  // 
---------------------------------------------------------------------------------------------------------------
+  @Test
+  public void testRetentionSkipsLineageLockedSegmentsAndCleansUpViaBypass() {
+    String sExpired = "t5_expired";
+    String sExpiredFree = "t5_expired_free";
+    String sKept = "t5_kept";
+    String sNew = "t5_new";
+
+    // sExpired:     end time 30 days ago → retention-eligible AND 
lineage-locked
+    // sExpiredFree: end time 30 days ago → retention-eligible, NOT 
lineage-locked
+    // sKept:        end time = now       → not retention-eligible
+    long nowMs = System.currentTimeMillis();
+    long thirtyDaysAgoMs = nowMs - TimeUnit.DAYS.toMillis(30L);
+    addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sExpired, thirtyDaysAgoMs 
- TimeUnit.DAYS.toMillis(1L),
+        thirtyDaysAgoMs);
+    addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sExpiredFree, 
thirtyDaysAgoMs - TimeUnit.DAYS.toMillis(1L),
+        thirtyDaysAgoMs);
+    addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sKept, nowMs - 
TimeUnit.HOURS.toMillis(1L), nowMs);
+
+    // Lock sExpired in an IN_PROGRESS lineage entry → segmentsFrom is 
lineage-locked
+    String entryId = 
_resourceManager.startReplaceSegments(RETENTION_OFFLINE_TABLE_NAME,
+        Collections.singletonList(sExpired), Collections.singletonList(sNew), 
false, null);
+    assertNotNull(entryId);
+
+    // Run retention. Time-based purge picks up both expired segments, but 
removeLineageLockedSegments strips
+    // sExpired out before the delete call — the rest of the batch 
(sExpiredFree) must still be deleted, proving
+    // the lineage-lock filtering does not poison the whole retention pass. 
The lineage-cleanup pass also does
+    // nothing yet because the entry is IN_PROGRESS and not aged enough.
+    _retentionManager.runProcessTable(RETENTION_OFFLINE_TABLE_NAME);
+    List<String> segmentsAfterFirstPass = 
getSegments(RETENTION_OFFLINE_TABLE_NAME);
+    assertTrue(segmentsAfterFirstPass.contains(sExpired),
+        "Retention must skip sExpired while it participates in a live lineage 
entry");
+    assertFalse(segmentsAfterFirstPass.contains(sExpiredFree),
+        "Retention must still delete the non-lineage-locked expired segment in 
the same pass");
+    assertTrue(segmentsAfterFirstPass.contains(sKept));
+    assertLineageEntry(RETENTION_OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS,
+        Collections.singletonList(sExpired), Collections.singletonList(sNew));
+
+    // Complete the replacement. sNew has a fresh end-time so it is not 
retention-eligible.
+    addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sNew, nowMs - 
TimeUnit.HOURS.toMillis(1L), nowMs);
+    _resourceManager.endReplaceSegments(RETENTION_OFFLINE_TABLE_NAME, entryId, 
null);
+    assertLineageEntry(RETENTION_OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.COMPLETED,
+        Collections.singletonList(sExpired), Collections.singletonList(sNew));
+    // endReplaceSegments does NOT proactively clean up segmentsFrom; that's 
the retention path's responsibility.
+    assertTrue(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sExpired));
+
+    // Run retention again. Now the COMPLETED entry's segmentsFrom is still 
lineage-locked w.r.t. time-based
+    // purge, but the lineage-cleanup pass is allowed to delete it (table is 
APPEND, so the replaced-segments
+    // retention window is bypassed). The cleanup must go through 
deleteSegmentsForLineageCleanup so it isn't
+    // self-blocked by the new check.
+    _retentionManager.runProcessTable(RETENTION_OFFLINE_TABLE_NAME);
+    // IdealState updates inside deleteSegmentsForLineageCleanup are 
synchronous (only the deep-store file deletion
+    // is async via SegmentDeletionManager), so we can assert directly.
+    assertFalse(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sExpired),
+        "Expected sExpired to be removed from IdealState by the 
lineage-cleanup pass");
+    assertTrue(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sNew));
+    assertTrue(segmentsAfterFirstPass.contains(sKept));
+  }
+
+  // 
---------------------------------------------------------------------------------------------------------------
+  // T6: Kill switch — deleting a lineage-locked segment is allowed when
+  //     controller.lineage.exclusive.delete.enabled=false
+  // 
---------------------------------------------------------------------------------------------------------------
+  @Test
+  public void testDeleteAllowedWhenLineageExclusiveDeleteDisabled()
+      throws IOException {
+    String s1 = "t6_s1";
+    String s1New = "t6_s1_new";
+    addSegments(OFFLINE_TABLE_NAME, s1);
+    String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME, 
Collections.singletonList(s1),
+        Collections.singletonList(s1New));
+
+    // PinotHelixResourceManager.isLineageExclusiveDeleteEnabled() reads from 
the live ControllerConf on every
+    // delete (no caching), so toggling the property here flips the behavior 
for the next REST call. Reset in a
+    // finally block so other tests on this shared singleton are not affected 
by the override.
+    ControllerConf conf = TEST_INSTANCE.getControllerConfig();
+    conf.setProperty(ControllerConf.LINEAGE_EXCLUSIVE_DELETE_ENABLED, false);
+    try {
+      sendDeleteSegment(OFFLINE_TABLE_NAME, s1);
+      assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1),
+          "Segment must be removed from IdealState when 
lineage-exclusive-delete is disabled");
+      // The lineage entry must not be mutated as a side effect of the delete.
+      assertLineageEntry(OFFLINE_TABLE_NAME, entryId, 
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+          Collections.singletonList(s1New));
+    } finally {
+      conf.setProperty(ControllerConf.LINEAGE_EXCLUSIVE_DELETE_ENABLED, true);
+    }
+  }
+
+  // 
---------------------------------------------------------------------------------------------------------------
+  // Helpers
+  // 
---------------------------------------------------------------------------------------------------------------
+
+  private void addSegments(String tableNameWithType, String... segmentNames) {
+    for (String name : segmentNames) {
+      _resourceManager.addNewSegment(tableNameWithType,
+          SegmentMetadataMockUtils.mockSegmentMetadata(tableNameWithType, 
name), "downloadUrl");
+    }
+  }
+
+  private void addSegmentWithTime(String tableNameWithType, String 
segmentName, long startTimeMs, long endTimeMs) {
+    String crc = Long.toString(System.nanoTime());
+    SegmentMetadata metadata = 
SegmentMetadataMockUtils.mockSegmentMetadata(tableNameWithType, segmentName, 
100, crc,
+        startTimeMs, endTimeMs, TimeUnit.MILLISECONDS);
+    _resourceManager.addNewSegment(tableNameWithType, metadata, "downloadUrl");
+  }
+
+  private List<String> getSegments(String tableNameWithType) {
+    return _resourceManager.getSegmentsFor(tableNameWithType, false);
+  }
+
+  private void assertLineageEntry(String tableNameWithType, String entryId, 
LineageEntryState expectedState,
+      List<String> expectedSegmentsFrom, List<String> expectedSegmentsTo) {
+    SegmentLineage lineage =
+        
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
 tableNameWithType);
+    assertNotNull(lineage, "Expected lineage znode to exist for table: " + 
tableNameWithType);
+    LineageEntry entry = lineage.getLineageEntry(entryId);
+    assertNotNull(entry, "Expected lineage entry: " + entryId + " for table: " 
+ tableNameWithType);
+    assertEquals(entry.getState(), expectedState,
+        "Unexpected state for lineage entry: " + entryId + " on table: " + 
tableNameWithType);
+    assertEquals(entry.getSegmentsFrom(), expectedSegmentsFrom,
+        "Unexpected segmentsFrom for lineage entry: " + entryId);
+    assertEquals(entry.getSegmentsTo(), expectedSegmentsTo,
+        "Unexpected segmentsTo for lineage entry: " + entryId);
+  }
+
+  private String postStartReplaceSegments(String tableNameWithType, 
List<String> segmentsFrom,
+      List<String> segmentsTo)
+      throws IOException {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    String url = _controllerBaseUrl + "/segments/" + rawTableName + 
"/startReplaceSegments?type=OFFLINE";
+    String body = JsonUtils.objectToString(new 
StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+    String response = ControllerTest.sendPostRequest(url, body);
+    JsonNode json = JsonUtils.stringToJsonNode(response);
+    JsonNode entryIdNode = json.get("segmentLineageEntryId");
+    assertNotNull(entryIdNode, "Expected segmentLineageEntryId in response. 
Was: " + response);
+    return entryIdNode.asText();
+  }
+
+  private void postEndReplaceSegments(String tableNameWithType, String entryId)
+      throws IOException {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    String url = _controllerBaseUrl + "/segments/" + rawTableName
+        + "/endReplaceSegments?type=OFFLINE&segmentLineageEntryId=" + entryId;
+    ControllerTest.sendPostRequest(url, "");
+  }
+
+  private void postRevertReplaceSegments(String tableNameWithType, String 
entryId, boolean forceRevert)
+      throws IOException {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    String url = _controllerBaseUrl + "/segments/" + rawTableName
+        + "/revertReplaceSegments?type=OFFLINE&segmentLineageEntryId=" + 
entryId + "&forceRevert=" + forceRevert;
+    ControllerTest.sendPostRequest(url, "");
+  }
+
+  private void sendDeleteSegment(String tableNameWithType, String segmentName)
+      throws IOException {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    String url = _controllerBaseUrl + "/segments/" + rawTableName + "/" + 
segmentName;
+    ControllerTest.sendDeleteRequest(url);
+  }
+
+  private void assertStatus(IOException e, int expectedStatus) {
+    String msg = e.getMessage() != null ? e.getMessage() : "";
+    assertTrue(msg.contains("status code: " + expectedStatus) || 
msg.contains("status: " + expectedStatus),
+        "Expected status " + expectedStatus + " but got: " + msg);
+  }
+
+  private void assertNon2xx(IOException e) {
+    String msg = e.getMessage() != null ? e.getMessage() : "";
+    // The HttpClient error message starts with "Got error status code: <N>" 
for any non-2xx response.
+    assertTrue(msg.contains("Got error status code"), "Expected a non-2xx HTTP 
error but got: " + msg);
+    assertFalse(Arrays.asList("200", "201", "202", "204").stream().anyMatch(
+        code -> msg.contains("status code: " + code + " ")), "Expected non-2xx 
response but got: " + msg);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to