This is an automated email from the ASF dual-hosted git repository.
swaminathanmanish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 005cac70a46 Enforce mutual exclusion between segment deletion and
segment lineage (#18518)
005cac70a46 is described below
commit 005cac70a463865722eae944c006318bc3f5cf0b
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed May 20 14:33:59 2026 +0530
Enforce mutual exclusion between segment deletion and segment lineage
(#18518)
* Handle delete segment flow along with lineage properly
* Add tests
* Gate retention's lineage-lock filter on the kill switch
The lineage-exclusive-delete kill switch only gated PinotHelixResource
Manager.deleteSegments. RetentionManager.removeLineageLockedSegments
unconditionally stripped lineage-locked segments, so disabling the switch
did not fully restore legacy retention behavior. Read the same config in
the retention filter; when the switch is off, pass lineage-locked
segments through to the (now-permissive) delete path.
Also drops two unused static imports from LineageDeleteExclusionTest.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Code review addressal
* Code review addressal
* Test case fixes
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../pinot/common/lineage/SegmentLineageUtils.java | 29 ++
.../pinot/common/metrics/ControllerMeter.java | 4 +-
.../apache/pinot/controller/ControllerConf.java | 8 +
.../api/resources/PinotSegmentRestletResource.java | 4 +-
.../helix/core/PinotHelixResourceManager.java | 89 +++-
.../core/lineage/SegmentsInLineageException.java | 55 +++
.../helix/core/retention/RetentionManager.java | 45 +-
.../core/lineage/LineageDeleteExclusionTest.java | 187 ++++++++
.../LineageDeleteInterleavingIntegrationTest.java | 509 +++++++++++++++++++++
9 files changed, 924 insertions(+), 6 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
index 073ff679b59..526397213ea 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.lineage;
+import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -53,4 +54,32 @@ public class SegmentLineageUtils {
}
}
}
+
+ /**
+ * Returns the set of segments that participate in a live lineage entry and
therefore must not be deleted by
+ * external callers.
+ * <ul>
+ * <li>{@code IN_PROGRESS} entries lock both {@code segmentsFrom} and
{@code segmentsTo}.</li>
+ * <li>{@code COMPLETED} entries lock {@code segmentsFrom}</li>
+ * <li>{@code REVERTED} entries lock nothing.</li>
+ * </ul>
+ */
+ public static Set<String> getDeleteBlockedSegments(SegmentLineage
segmentLineage) {
+ Set<String> blocked = new HashSet<>();
+ for (LineageEntry lineageEntry :
segmentLineage.getLineageEntries().values()) {
+ switch (lineageEntry.getState()) {
+ case IN_PROGRESS:
+ blocked.addAll(lineageEntry.getSegmentsFrom());
+ blocked.addAll(lineageEntry.getSegmentsTo());
+ break;
+ case COMPLETED:
+ blocked.addAll(lineageEntry.getSegmentsFrom());
+ break;
+ case REVERTED:
+ default:
+ break;
+ }
+ }
+ return blocked;
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index d812d1c1059..4ba39d3f87b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -83,7 +83,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
AUDIT_RESPONSE_FAILURES("failures", true),
AUDIT_REQUEST_PAYLOAD_TRUNCATED("count", true),
// Upsert compact merge task metrics
-
UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE("UpsertCompactMergeSegmentsSkipped",
false);
+
UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE("UpsertCompactMergeSegmentsSkipped",
false),
+ // Number of segment-delete requests rejected because the targets
participate in a live segment lineage entry.
+ LINEAGE_BLOCKED_DELETE_COUNT("LineageBlockedDeleteCount", false);
private final String _brokerMeterName;
private final String _unit;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 6c9fad642b4..93a0cec84d9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -360,6 +360,9 @@ public class ControllerConf extends PinotConfiguration {
public static final String ENABLE_BATCH_MESSAGE_MODE =
"controller.enable.batch.message.mode";
public static final String ENABLE_HYBRID_TABLE_RETENTION_STRATEGY =
"controller.enable.hybrid.table.retention.strategy";
+ // When true (default), segment deletion refuses to remove segments that
participate in a live segment lineage
+ // entry (IN_PROGRESS, or COMPLETED.segmentsFrom).
+ public static final String LINEAGE_EXCLUSIVE_DELETE_ENABLED =
"controller.lineage.exclusive.delete.enabled";
public static final String DIM_TABLE_MAX_SIZE =
"controller.dimTable.maxSize";
// Defines the kind of storage and the underlying PinotFS implementation
@@ -395,6 +398,7 @@ public class ControllerConf extends PinotConfiguration {
public static final boolean
DEFAULT_RESOURCE_UTILIZATION_CHECKER_COLLECT_USAGE_AT_STARTUP = false;
public static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
public static final boolean DEFAULT_ENABLE_HYBRID_TABLE_RETENTION_STRATEGY =
false;
+ public static final boolean DEFAULT_LINEAGE_EXCLUSIVE_DELETE_ENABLED = true;
public static final String DEFAULT_CONTROLLER_MODE =
ControllerMode.DUAL.name();
public static final String
DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_STRATEGY =
AutoRebalanceStrategy.class.getName();
@@ -1141,6 +1145,10 @@ public class ControllerConf extends PinotConfiguration {
return getProperty(ENABLE_HYBRID_TABLE_RETENTION_STRATEGY,
DEFAULT_ENABLE_HYBRID_TABLE_RETENTION_STRATEGY);
}
+ public boolean isLineageExclusiveDeleteEnabled() {
+ return getProperty(LINEAGE_EXCLUSIVE_DELETE_ENABLED,
DEFAULT_LINEAGE_EXCLUSIVE_DELETE_ENABLED);
+ }
+
public int getSegmentLevelValidationIntervalInSeconds() {
String period =
getProperty(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_PERIOD,
ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_PERIOD);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index e4e10e928de..2ed6a42587c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -689,8 +689,8 @@ public class PinotSegmentRestletResource {
private void deleteSegmentsInternal(String tableNameWithType, List<String>
segments,
@Nullable String retentionPeriod) {
- PinotResourceManagerResponse response =
_pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
- retentionPeriod);
+ PinotResourceManagerResponse response =
+ _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
retentionPeriod);
if (!response.isSuccessful()) {
throw new ControllerApplicationException(LOGGER,
"Failed to delete segments from table: " + tableNameWithType + ",
error message: " + response.getMessage(),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3aef26fe390..6208eb6262f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -118,6 +118,7 @@ import
org.apache.pinot.common.messages.TableDeletionMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataUtils;
+import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
@@ -154,6 +155,7 @@ import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.lineage.LineageManager;
import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
+import
org.apache.pinot.controller.helix.core.lineage.SegmentsInLineageException;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
@@ -1073,6 +1075,15 @@ public class PinotHelixResourceManager {
/**
* Delete a list of segments from ideal state and remove them from the local
storage.
*
+ * <p>Pre-checks the table's segment lineage znode and rejects the whole
batch with a failure response if any
+ * target segment participates in a live lineage entry (signalled internally
via {@link SegmentsInLineageException}).
+ * If we don't do this and a segment is deleted while it is being replaced,
it could lead to the deleted data
+ * resurfacing with the replaced segment. To prevent that, we will block
segment deletion if its part of the FROM
+ * list in an IN_PROGRESS / COMPLETED Segment Lineage entry.
+ *
+ * <p>Cleanup paths that already coordinated with the lineage lifecycle must
call
+ * {@link #deleteSegmentsForLineageCleanup} instead.
+ *
* @param tableNameWithType Table name with type suffix
* @param segmentNames List of names of segment to be deleted
* @param retentionPeriod The retention period of the deleted segments.
@@ -1080,6 +1091,23 @@ public class PinotHelixResourceManager {
*/
public PinotResourceManagerResponse deleteSegments(String tableNameWithType,
List<String> segmentNames,
@Nullable String retentionPeriod) {
+ return deleteSegmentsInternal(tableNameWithType, segmentNames,
retentionPeriod, false);
+ }
+
+ /**
+ * Lineage-aware delete path that skips the cross-check against the live
lineage entries. Reserved for callers
+ * that have already coordinated with the lineage lifecycle: proactive
cleanup in {@code startReplaceSegments},
+ * post-revert cleanup in {@code revertReplaceSegments}, and {@code
RetentionManager}'s lineage-cleanup pass.
+ * External call sites (REST handlers, retention based on table config,
minion task generators, push-failure
+ * cleanup) must continue to use the public {@link #deleteSegments}
overloads.
+ */
+ public PinotResourceManagerResponse deleteSegmentsForLineageCleanup(String
tableNameWithType,
+ List<String> segmentNames) {
+ return deleteSegmentsInternal(tableNameWithType, segmentNames, null, true);
+ }
+
+ private PinotResourceManagerResponse deleteSegmentsInternal(String
tableNameWithType, List<String> segmentNames,
+ @Nullable String retentionPeriod, boolean bypassLineageCheck) {
if (segmentNames.isEmpty()) {
return PinotResourceManagerResponse.success("No segments to delete");
}
@@ -1087,6 +1115,10 @@ public class PinotHelixResourceManager {
LOGGER.info("Trying to delete segments: {} from table: {} ",
segmentNames, tableNameWithType);
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix",
tableNameWithType);
+ if (!bypassLineageCheck && isLineageExclusiveDeleteEnabled()) {
+ // Reject the whole batch if any target segment participates in a live
lineage entry.
+ rejectIfTargetsLineageLockedSegments(tableNameWithType, segmentNames);
+ }
HelixHelper.removeSegmentsFromIdealState(_helixZkManager,
tableNameWithType, segmentNames);
if (retentionPeriod != null) {
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
@@ -1096,12 +1128,47 @@ public class PinotHelixResourceManager {
_segmentDeletionManager.deleteSegments(tableNameWithType,
segmentNames, tableConfig);
}
return PinotResourceManagerResponse.success("Segment " + segmentNames +
" deleted");
+ } catch (SegmentsInLineageException e) {
+ LOGGER.warn("Refusing to delete segments from table: {}. {}",
tableNameWithType, e.getMessage());
+ return PinotResourceManagerResponse.failure(e.getMessage());
} catch (final Exception e) {
LOGGER.error("Caught exception while deleting segment: {} from table:
{}", segmentNames, tableNameWithType, e);
return PinotResourceManagerResponse.failure(e.getMessage());
}
}
+ /**
+ * Reads the current segment lineage znode (if any) and throws {@link
SegmentsInLineageException} when the
+ * delete batch intersects the lineage-locked set.
+ */
+ private void rejectIfTargetsLineageLockedSegments(String tableNameWithType,
List<String> segmentNames) {
+ SegmentLineage segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType);
+ if (segmentLineage == null) {
+ return;
+ }
+ Set<String> blocked =
SegmentLineageUtils.getDeleteBlockedSegments(segmentLineage);
+ if (blocked.isEmpty()) {
+ return;
+ }
+ List<String> blockingTargets = new ArrayList<>();
+ for (String segment : segmentNames) {
+ if (blocked.contains(segment)) {
+ blockingTargets.add(segment);
+ }
+ }
+ if (blockingTargets.isEmpty()) {
+ return;
+ }
+ if (_controllerMetrics != null) {
+ _controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.LINEAGE_BLOCKED_DELETE_COUNT, 1L);
+ }
+ throw new SegmentsInLineageException(tableNameWithType, blockingTargets);
+ }
+
+ private boolean isLineageExclusiveDeleteEnabled() {
+ return _controllerConf == null ||
_controllerConf.isLineageExclusiveDeleteEnabled();
+ }
+
/**
* Delete a single segment from ideal state and remove it from the local
storage.
*
@@ -4587,7 +4654,7 @@ public class PinotHelixResourceManager {
// is safe to physically delete segments.
if (!segmentsToCleanUp.isEmpty()) {
LOGGER.info("Cleaning up the segments while startReplaceSegments: {}",
segmentsToCleanUp);
- deleteSegments(tableNameWithType, segmentsToCleanUp);
+ deleteSegmentsForLineageCleanup(tableNameWithType, segmentsToCleanUp);
}
// Only successful attempt can reach here
@@ -4640,6 +4707,24 @@ public class PinotHelixResourceManager {
}
Set<String> segmentsForTable = new
HashSet<>(getSegmentsFor(tableNameWithType, false));
+ // Re-validate that every segmentsFrom is still present in IdealState.
+ // If a segment was deleted concurrently while it was added in the
LineageEntry,
+ // the lineage entry is now corrupt and should be reverted.
+ List<String> missingSegmentsFrom = new ArrayList<>();
+ for (String segment : lineageEntry.getSegmentsFrom()) {
+ if (!segmentsForTable.contains(segment)) {
+ missingSegmentsFrom.add(segment);
+ }
+ }
+ if (!missingSegmentsFrom.isEmpty()) {
+ String errorMsg = "Cannot complete segment replacement: segments
from 'segmentsFrom' no longer exist in "
+ + "IdealState (likely deleted by a concurrent request). Recover
by calling revertReplaceSegments with "
+ + "forceRevert=true. (tableNameWithType=" + tableNameWithType +
", segmentLineageEntryId="
+ + segmentLineageEntryId + ", missingSegmentsFrom=" +
missingSegmentsFrom + ")";
+ LOGGER.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
List<String> segmentsTo = lineageEntry.getSegmentsTo();
if (endReplaceSegmentsRequest != null &&
!endReplaceSegmentsRequest.getSegmentsTo().isEmpty()) {
Set<String> segmentsToInSet = new HashSet<>(segmentsTo);
@@ -4823,7 +4908,7 @@ public class PinotHelixResourceManager {
// Invoke the proactive clean-up for segments that we no longer needs
if (!segmentsTo.isEmpty()) {
- deleteSegments(tableNameWithType, segmentsTo);
+ deleteSegmentsForLineageCleanup(tableNameWithType, segmentsTo);
}
return true;
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/SegmentsInLineageException.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/SegmentsInLineageException.java
new file mode 100644
index 00000000000..2a38571175f
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/SegmentsInLineageException.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.Collections;
+import java.util.List;
+
+
+/// Thrown when a segment deletion request targets segments that participate
in a live segment lineage entry.
+///
+/// A segment is considered lineage-locked when some lineage entry has state
`IN_PROGRESS` and the segment appears
+/// in either `segmentsFrom` or `segmentsTo`, or has state `COMPLETED` and the
segment appears in `segmentsFrom`.
+/// Such segments must be cleaned up through the lineage lifecycle (end /
revert / retention) rather than via the
+/// generic delete path. The whole batch is rejected so callers never observe
a partial commit.
+public class SegmentsInLineageException extends RuntimeException {
+ private final String _tableNameWithType;
+ private final List<String> _blockingSegments;
+
+ public SegmentsInLineageException(String tableNameWithType, List<String>
blockingSegments) {
+ super(buildMessage(tableNameWithType, blockingSegments));
+ _tableNameWithType = tableNameWithType;
+ _blockingSegments = Collections.unmodifiableList(blockingSegments);
+ }
+
+ public String getTableNameWithType() {
+ return _tableNameWithType;
+ }
+
+ public List<String> getBlockingSegments() {
+ return _blockingSegments;
+ }
+
+ private static String buildMessage(String tableNameWithType, List<String>
blockingSegments) {
+ return "Cannot delete segments from table: " + tableNameWithType
+ + " because they participate in a live segment lineage entry. Use the
lineage lifecycle "
+ + "(endReplaceSegments / revertReplaceSegments with forceRevert=true)
to clean them up. "
+ + "Blocking segments: " + blockingSegments;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index d01aeb1a877..82cd4076128 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -37,6 +37,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.logging.log4j.util.Strings;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerGauge;
@@ -87,6 +88,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
private volatile boolean _isHybridTableRetentionStrategyEnabled;
private volatile boolean _useCreationTimeFallbackForRetention;
private final BrokerServiceHelper _brokerServiceHelper;
+ private final ControllerConf _controllerConf;
public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
@@ -100,6 +102,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
_isHybridTableRetentionStrategyEnabled =
config.isHybridTableRetentionStrategyEnabled();
_useCreationTimeFallbackForRetention =
config.isRetentionCreationTimeFallbackEnabled();
_brokerServiceHelper = brokerServiceHelper;
+ _controllerConf = config;
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}",
getIntervalInSeconds());
}
@@ -191,6 +194,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
segmentsToDelete.add(segmentZKMetadata.getSegmentName());
}
}
+ removeLineageLockedSegments(offlineTableName, segmentsToDelete);
if (!segmentsToDelete.isEmpty()) {
LOGGER.info("Deleting {} segments from table: {}",
segmentsToDelete.size(), offlineTableName);
_pinotHelixResourceManager.deleteSegments(offlineTableName,
segmentsToDelete);
@@ -229,6 +233,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
// Remove last sealed segments such that the table can still create new
consuming segments if it's paused
segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(realtimeTableName));
+ removeLineageLockedSegments(realtimeTableName, segmentsToDelete);
if (!segmentsToDelete.isEmpty()) {
LOGGER.info("Deleting {} segments from table: {}",
segmentsToDelete.size(), realtimeTableName);
_pinotHelixResourceManager.deleteSegments(realtimeTableName,
segmentsToDelete);
@@ -276,6 +281,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
segmentsToDelete.add(segmentZKMetadata.getSegmentName());
}
}
+ removeLineageLockedSegments(realtimeTableName, segmentsToDelete);
LOGGER.info("Deleting {} segments from table: {}",
segmentsToDelete.size(), realtimeTableName);
if (!segmentsToDelete.isEmpty()) {
_pinotHelixResourceManager.deleteSegments(realtimeTableName,
segmentsToDelete);
@@ -451,6 +457,43 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
return segmentName;
}
+ /**
+ * Strips out any segments that participate in a live segment lineage entry
from the retention-driven
+ * delete batch. Time-based retention must not delete lineage-locked
segments — they are owned by the
+ * lineage lifecycle and get cleaned up by {@link
#manageSegmentLineageCleanupForTable} when the lineage
+ * entry becomes eligible. If we left them in, the public delete check would
reject the whole batch and
+ * the rest of the eligible segments would never get cleaned up.
+ * <p>
+ * Gated by {@link ControllerConf#LINEAGE_EXCLUSIVE_DELETE_ENABLED}: when
the kill switch is off,
+ * {@code deleteSegments} also stops rejecting lineage-locked targets, so
retention must mirror legacy
+ * behavior and pass them through to the delete path instead of silently
dropping them here.
+ */
+ private void removeLineageLockedSegments(String tableNameWithType,
List<String> segmentsToDelete) {
+ if (segmentsToDelete.isEmpty()) {
+ return;
+ }
+ if (!_controllerConf.isLineageExclusiveDeleteEnabled()) {
+ return;
+ }
+ SegmentLineage segmentLineage =
+
SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
tableNameWithType);
+ if (segmentLineage == null) {
+ return;
+ }
+ Set<String> blocked =
SegmentLineageUtils.getDeleteBlockedSegments(segmentLineage);
+ if (blocked.isEmpty()) {
+ return;
+ }
+ int sizeBefore = segmentsToDelete.size();
+ segmentsToDelete.removeIf(blocked::contains);
+ int removed = sizeBefore - segmentsToDelete.size();
+ if (removed > 0) {
+ LOGGER.info(
+ "Skipping {} segments in retention pass for table: {} because they
participate in a live lineage entry; "
+ + "they will be cleaned up by the lineage retention path.",
removed, tableNameWithType);
+ }
+ }
+
private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) {
String tableNameWithType = tableConfig.getTableName();
List<String> segmentsToDelete = new ArrayList<>();
@@ -496,7 +539,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
// Delete segments based on the segment lineage
if (!segmentsToDelete.isEmpty()) {
- _pinotHelixResourceManager.deleteSegments(tableNameWithType,
segmentsToDelete);
+
_pinotHelixResourceManager.deleteSegmentsForLineageCleanup(tableNameWithType,
segmentsToDelete);
LOGGER.info("Finished cleaning up segment lineage for table: {} in {}ms,
deleted segments: {}",
tableNameWithType, (System.currentTimeMillis() - cleanupStartTime),
segmentsToDelete);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteExclusionTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteExclusionTest.java
new file mode 100644
index 00000000000..4ff13707709
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteExclusionTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Functional tests for the lineage-aware segment delete check in
+ * {@link PinotHelixResourceManager#deleteSegments(String, List)} and the
+ * {@code segmentsFrom} re-validation in
+ * {@link PinotHelixResourceManager#endReplaceSegments(String, String,
+ * org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest)}.
+ *
+ * <p>Each test uses unique segment names so that the asynchronous
segment-ZK-metadata cleanup performed by
+ * {@code SegmentDeletionManager} cannot interfere with the next test's
segment additions.
+ */
+public class LineageDeleteExclusionTest {
+ private static final ControllerTest TEST_INSTANCE =
ControllerTest.getInstance();
+ private static final String RAW_TABLE_NAME = "lineageDeleteTable";
+ private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+
+ private PinotHelixResourceManager _resourceManager;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TEST_INSTANCE.setupSharedStateAndValidate();
+ _resourceManager = TEST_INSTANCE.getHelixResourceManager();
+ TEST_INSTANCE.addDummySchema(RAW_TABLE_NAME);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
+ _resourceManager.addTable(tableConfig);
+ }
+
+ @BeforeMethod
+ public void clearLineage() {
+ // Drop any lineage znode written by an earlier test so tests do not bleed
into each other.
+
SegmentLineageAccessHelper.deleteSegmentLineage(_resourceManager.getPropertyStore(),
OFFLINE_TABLE_NAME);
+ }
+
+ private void addSegments(String... segmentNames) {
+ for (String name : segmentNames) {
+ _resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
name), "downloadUrl");
+ }
+ }
+
+ private void writeLineageEntry(String entryId, List<String> segmentsFrom,
List<String> segmentsTo,
+ LineageEntryState state) {
+ SegmentLineage existing =
+
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
OFFLINE_TABLE_NAME);
+ SegmentLineage lineage = existing != null ? existing : new
SegmentLineage(OFFLINE_TABLE_NAME);
+ lineage.addLineageEntry(entryId, new LineageEntry(segmentsFrom,
segmentsTo, state, System.currentTimeMillis()));
+
assertTrue(SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
lineage, -1));
+ }
+
+ @Test
+ public void testDeleteBlockedForInProgressSegmentsFrom() {
+ addSegments("ipfA1", "ipfA2", "ipfB1");
+ writeLineageEntry("e1", Arrays.asList("ipfA1", "ipfA2"),
Collections.singletonList("ipfB1"),
+ LineageEntryState.IN_PROGRESS);
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegments(OFFLINE_TABLE_NAME,
Collections.singletonList("ipfA1"));
+ assertFalse(response.isSuccessful());
+ assertTrue(response.getMessage().contains("ipfA1"));
+ // IdealState untouched
+ assertTrue(_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false).contains("ipfA1"));
+ }
+
+ @Test
+ public void testDeleteBlockedForInProgressSegmentsTo() {
+ addSegments("iptA1", "iptB1");
+ writeLineageEntry("e1", Collections.singletonList("iptA1"),
Collections.singletonList("iptB1"),
+ LineageEntryState.IN_PROGRESS);
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegments(OFFLINE_TABLE_NAME,
Collections.singletonList("iptB1"));
+ assertFalse(response.isSuccessful());
+ assertTrue(_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false).contains("iptB1"));
+ }
+
+ @Test
+ public void testDeleteBlockedForCompletedSegmentsFrom() {
+ addSegments("cpfA1", "cpfB1");
+ writeLineageEntry("e1", Collections.singletonList("cpfA1"),
Collections.singletonList("cpfB1"),
+ LineageEntryState.COMPLETED);
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegments(OFFLINE_TABLE_NAME,
Collections.singletonList("cpfA1"));
+ assertFalse(response.isSuccessful());
+ assertTrue(_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false).contains("cpfA1"));
+ }
+
+ @Test
+ public void testDeleteAllowedForCompletedSegmentsTo() {
+ addSegments("cptA1", "cptB1");
+ writeLineageEntry("e1", Collections.singletonList("cptA1"),
Collections.singletonList("cptB1"),
+ LineageEntryState.COMPLETED);
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegments(OFFLINE_TABLE_NAME,
Collections.singletonList("cptB1"));
+ assertTrue(response.isSuccessful(), response.getMessage());
+ }
+
+ @Test
+ public void testDeleteAllowedForRevertedEntry() {
+ addSegments("revA1", "revB1");
+ writeLineageEntry("e1", Collections.singletonList("revA1"),
Collections.singletonList("revB1"),
+ LineageEntryState.REVERTED);
+ // Either segmentsFrom or segmentsTo of a REVERTED entry should be
unblocked.
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegments(OFFLINE_TABLE_NAME,
Arrays.asList("revA1", "revB1"));
+ assertTrue(response.isSuccessful(), response.getMessage());
+ }
+
+ @Test
+ public void testBatchRejectionLeavesIdealStateUntouched() {
+ addSegments("batchA1", "batchA2", "batchFree");
+ writeLineageEntry("e1", Collections.singletonList("batchA1"),
Collections.singletonList("batchA2"),
+ LineageEntryState.IN_PROGRESS);
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegments(OFFLINE_TABLE_NAME,
Arrays.asList("batchA1", "batchFree"));
+ assertFalse(response.isSuccessful());
+ // Neither "batchA1" nor "batchFree" should have been removed
+ List<String> segments =
_resourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false);
+ assertTrue(segments.contains("batchA1"));
+ assertTrue(segments.contains("batchFree"));
+ }
+
+ @Test
+ public void testBypassOverloadAllowsBlockedSegments() {
+ addSegments("byA1", "byB1");
+ writeLineageEntry("e1", Collections.singletonList("byA1"),
Collections.singletonList("byB1"),
+ LineageEntryState.IN_PROGRESS);
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegmentsForLineageCleanup(OFFLINE_TABLE_NAME,
Collections.singletonList("byA1"));
+ assertTrue(response.isSuccessful(), response.getMessage());
+ }
+
+ @Test
+ public void testNoLineageZnodeFastPath() {
+ addSegments("nolzA1");
+ PinotResourceManagerResponse response =
+ _resourceManager.deleteSegments(OFFLINE_TABLE_NAME,
Collections.singletonList("nolzA1"));
+ assertTrue(response.isSuccessful(), response.getMessage());
+ }
+
+ @AfterClass
+ public void tearDown() {
+ TEST_INSTANCE.cleanup();
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
new file mode 100644
index 00000000000..ad922636e12
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.retention.RetentionManager;
+import org.apache.pinot.controller.util.BrokerServiceHelper;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/**
+ * Integration tests for the lineage-aware segment delete check at the REST
layer and across an interleaved
+ * start/end/revert lifecycle.
+ *
+ * <p>This test:
+ * <ul>
+ * <li>Drives every step through the REST endpoints.</li>
+ * <li>Walks a single table through all three lineage states with a delete
attempt at every step.</li>
+ * <li>Re-reads the lineage znode after every API call (success or failure)
to verify failed requests never
+ * mutate the entry.</li>
+ * <li>Verifies the retention path silently skips lineage-locked segments
and that the lineage-cleanup pass
+ * uses the bypass delete path.</li>
+ * </ul>
+ *
+ * <p>Each test uses unique segment names so the asynchronous {@code
SegmentDeletionManager} cleanup cannot
+ * interfere with later tests.
+ */
+public class LineageDeleteInterleavingIntegrationTest {
+ private static final ControllerTest TEST_INSTANCE =
ControllerTest.getInstance();
+ private static final String RAW_TABLE_NAME = "lineageInterleavingTable";
+ private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+ // Separate table with retention configured, used by the
retention-interaction test so it does not affect the
+ // other scenarios that should never trigger time-based deletion.
+ private static final String RETENTION_RAW_TABLE_NAME =
"lineageInterleavingRetentionTable";
+ private static final String RETENTION_OFFLINE_TABLE_NAME =
+ TableNameBuilder.OFFLINE.tableNameWithType(RETENTION_RAW_TABLE_NAME);
+
+ private PinotHelixResourceManager _resourceManager;
+ private String _controllerBaseUrl;
+ private TestableRetentionManager _retentionManager;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TEST_INSTANCE.setupSharedStateAndValidate();
+ _resourceManager = TEST_INSTANCE.getHelixResourceManager();
+ _controllerBaseUrl = TEST_INSTANCE.getControllerBaseApiUrl();
+
+ TEST_INSTANCE.addDummySchema(RAW_TABLE_NAME);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
+ _resourceManager.addTable(tableConfig);
+
+ TEST_INSTANCE.addDummySchema(RETENTION_RAW_TABLE_NAME);
+ TableConfig retentionTableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RETENTION_RAW_TABLE_NAME).setNumReplicas(1)
+ .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").build();
+ _resourceManager.addTable(retentionTableConfig);
+
+ // RetentionManager configured with zero frequencies so we can drive
processTable() directly and validate the
+ // lineage interaction in isolation.
+ ControllerConf controllerConf = new ControllerConf();
+ controllerConf.setRetentionControllerFrequencyInSeconds(0);
+ controllerConf.setDeletedSegmentsRetentionInDays(0);
+ BrokerServiceHelper brokerServiceHelper = new
BrokerServiceHelper(_resourceManager, controllerConf, null, null);
+ _retentionManager = new TestableRetentionManager(_resourceManager,
mock(LeadControllerManager.class),
+ controllerConf, mock(ControllerMetrics.class), brokerServiceHelper);
+ }
+
+ /**
+ * Subclass that exposes the package-protected {@link
RetentionManager#processTable(String)} so this test
+ * (in a different package) can drive a single retention pass against a
single table.
+ */
+ private static final class TestableRetentionManager extends RetentionManager
{
+ TestableRetentionManager(PinotHelixResourceManager
pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
+ BrokerServiceHelper brokerServiceHelper) {
+ super(pinotHelixResourceManager, leadControllerManager, config,
controllerMetrics, brokerServiceHelper);
+ }
+
+ void runProcessTable(String tableNameWithType) {
+ processTable(tableNameWithType);
+ }
+ }
+
+ @BeforeMethod
+ public void clearLineage() {
+
SegmentLineageAccessHelper.deleteSegmentLineage(_resourceManager.getPropertyStore(),
OFFLINE_TABLE_NAME);
+
SegmentLineageAccessHelper.deleteSegmentLineage(_resourceManager.getPropertyStore(),
+ RETENTION_OFFLINE_TABLE_NAME);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ TEST_INSTANCE.cleanup();
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // T1: full lifecycle interleaved with delete attempts at every step
+ //
---------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testFullLifecycleInterleavedWithDeleteAttempts()
+ throws IOException {
+ String s1 = "t1_s1";
+ String s2 = "t1_s2";
+ String s3 = "t1_s3";
+ String s1New = "t1_s1_new";
+
+ addSegments(OFFLINE_TABLE_NAME, s1, s2, s3);
+
+ // Step 1: startReplaceSegments → IN_PROGRESS
+ String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME,
Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Step 2: delete s1 (segmentsFrom of IN_PROGRESS) → 500, IdealState +
lineage untouched
+ IOException ex500 = expectThrows(IOException.class, () ->
sendDeleteSegment(OFFLINE_TABLE_NAME, s1));
+ assertStatus(ex500, 500);
+ assertTrue(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Step 3: upload s1New + delete s1New (segmentsTo of IN_PROGRESS) → 500.
+ // the lineage check still rejects without touching the lineage znode.
+ addSegments(OFFLINE_TABLE_NAME, s1New);
+ ex500 = expectThrows(IOException.class, () ->
sendDeleteSegment(OFFLINE_TABLE_NAME, s1New));
+ assertStatus(ex500, 500);
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Step 4: endReplaceSegments → COMPLETED
+ postEndReplaceSegments(OFFLINE_TABLE_NAME, entryId);
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.COMPLETED, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Step 5: delete s1 (segmentsFrom of COMPLETED) → still 500
+ ex500 = expectThrows(IOException.class, () ->
sendDeleteSegment(OFFLINE_TABLE_NAME, s1));
+ assertStatus(ex500, 500);
+ assertTrue(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.COMPLETED, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Step 6: delete s1New (segmentsTo of COMPLETED) → 200 (segmentsTo is NOT
locked once the entry is COMPLETED).
+ // The lineage entry itself must NOT mutate as a side effect of the delete.
+ sendDeleteSegment(OFFLINE_TABLE_NAME, s1New);
+ assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1New));
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.COMPLETED, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Step 7: revertReplaceSegments (with forceRevert so it can revert a
COMPLETED entry safely) → REVERTED
+ postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId, true);
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.REVERTED, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Step 8: delete s1 (REVERTED locks nothing) → 200
+ sendDeleteSegment(OFFLINE_TABLE_NAME, s1);
+ assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.REVERTED, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Final: IdealState contains exactly s2, s3; lineage has exactly one
entry in REVERTED state
+ List<String> finalSegments = getSegments(OFFLINE_TABLE_NAME);
+ assertTrue(finalSegments.contains(s2));
+ assertTrue(finalSegments.contains(s3));
+ SegmentLineage lineage =
+
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
OFFLINE_TABLE_NAME);
+ assertEquals(lineage.getLineageEntryIds().size(), 1);
+ assertEquals(lineage.getLineageEntry(entryId).getState(),
LineageEntryState.REVERTED);
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // T2: REST-level batch delete rejection atomicity
+ //
---------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testBatchDeleteRejectionAtomicityOverRest()
+ throws IOException {
+ String s1 = "t2_s1";
+ String s1New = "t2_s1_new";
+ String sFree = "t2_sFree";
+
+ addSegments(OFFLINE_TABLE_NAME, s1, sFree);
+ String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME,
Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // Batch delete via DELETE
/segments/{tableName}?type=OFFLINE&segments=s1&segments=sFree → 500
+ String url = _controllerBaseUrl + "/segments/" + RAW_TABLE_NAME +
"?type=OFFLINE&segments=" + s1
+ + "&segments=" + sFree;
+ IOException ex500 = expectThrows(IOException.class, () ->
ControllerTest.sendDeleteRequest(url));
+ assertStatus(ex500, 500);
+ // Body should mention the blocking segment, but not the free one (the
free one is not what's blocking).
+ String body = ex500.getMessage();
+ assertTrue(body.contains(s1), "Expected response to mention blocking
segment: " + s1 + ", was: " + body);
+ assertFalse(body.contains("Blocking segments: [" + sFree),
+ "Free segment should not appear in blocking list. Was: " + body);
+
+ // Neither s1 nor sFree was removed
+ List<String> segments = getSegments(OFFLINE_TABLE_NAME);
+ assertTrue(segments.contains(s1));
+ assertTrue(segments.contains(sFree));
+
+ // Lineage entry is untouched
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // T3: Recovery via forceRevert when a (bypass) concurrent delete strands
the lineage entry
+ //
---------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testRecoveryViaForceRevertAfterCorruptedLineage()
+ throws IOException {
+ String s1 = "t4_s1";
+ String s2 = "t4_s2";
+ addSegments(OFFLINE_TABLE_NAME, s1);
+
+ String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME,
Collections.singletonList(s1),
+ Collections.singletonList(s2));
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+
+ // Simulate the lost-race outcome via the bypass overload (the only path
that can produce this state — the
+ // public REST DELETE would be rejected by the new check).
+ _resourceManager.deleteSegmentsForLineageCleanup(OFFLINE_TABLE_NAME,
Collections.singletonList(s1));
+ assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1));
+ // Lineage entry must NOT mutate as a side effect of the bypass delete.
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+
+ // Upload s2; lineage still unchanged
+ addSegments(OFFLINE_TABLE_NAME, s2);
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+
+ // endReplaceSegments must fail because s1 (segmentsFrom) is gone.
+ IOException endEx = expectThrows(IOException.class, () ->
postEndReplaceSegments(OFFLINE_TABLE_NAME, entryId));
+ assertNon2xx(endEx);
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+
+ // Operator recovery via revertReplaceSegments(forceRevert=true) → REVERTED
+ postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId, true);
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.REVERTED, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+
+ // s2 is now unblocked — delete via the public path
+ sendDeleteSegment(OFFLINE_TABLE_NAME, s2);
+ assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s2));
+ // Delete must not touch the lineage entry
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.REVERTED, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // T4b: revertReplaceSegments without forceRevert must NOT revert an
IN_PROGRESS entry, and a failed revert must
+ // not corrupt the znode either
+ //
---------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testRevertWithoutForceRevertFailsOnCorruptedLineage()
+ throws IOException {
+ String s1 = "t4b_s1";
+ String s2 = "t4b_s2";
+ addSegments(OFFLINE_TABLE_NAME, s1);
+
+ String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME,
Collections.singletonList(s1),
+ Collections.singletonList(s2));
+ _resourceManager.deleteSegmentsForLineageCleanup(OFFLINE_TABLE_NAME,
Collections.singletonList(s1));
+ addSegments(OFFLINE_TABLE_NAME, s2);
+
+ // Attempting revertReplaceSegments without forceRevert against an
IN_PROGRESS entry should fail.
+ IOException ex = expectThrows(IOException.class, () ->
postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId,
+ false));
+ assertNon2xx(ex);
+ // Failed revert must not mutate the znode.
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+
+ // Retry with forceRevert=true → 200
+ postRevertReplaceSegments(OFFLINE_TABLE_NAME, entryId, true);
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.REVERTED, Collections.singletonList(s1),
+ Collections.singletonList(s2));
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // T5: retention manager skips lineage-locked segments and the
lineage-cleanup pass uses the bypass path
+ //
---------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testRetentionSkipsLineageLockedSegmentsAndCleansUpViaBypass() {
+ String sExpired = "t5_expired";
+ String sExpiredFree = "t5_expired_free";
+ String sKept = "t5_kept";
+ String sNew = "t5_new";
+
+ // sExpired: end time 30 days ago → retention-eligible AND
lineage-locked
+ // sExpiredFree: end time 30 days ago → retention-eligible, NOT
lineage-locked
+ // sKept: end time = now → not retention-eligible
+ long nowMs = System.currentTimeMillis();
+ long thirtyDaysAgoMs = nowMs - TimeUnit.DAYS.toMillis(30L);
+ addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sExpired, thirtyDaysAgoMs
- TimeUnit.DAYS.toMillis(1L),
+ thirtyDaysAgoMs);
+ addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sExpiredFree,
thirtyDaysAgoMs - TimeUnit.DAYS.toMillis(1L),
+ thirtyDaysAgoMs);
+ addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sKept, nowMs -
TimeUnit.HOURS.toMillis(1L), nowMs);
+
+ // Lock sExpired in an IN_PROGRESS lineage entry → segmentsFrom is
lineage-locked
+ String entryId =
_resourceManager.startReplaceSegments(RETENTION_OFFLINE_TABLE_NAME,
+ Collections.singletonList(sExpired), Collections.singletonList(sNew),
false, null);
+ assertNotNull(entryId);
+
+ // Run retention. Time-based purge picks up both expired segments, but
removeLineageLockedSegments strips
+ // sExpired out before the delete call — the rest of the batch
(sExpiredFree) must still be deleted, proving
+ // the lineage-lock filtering does not poison the whole retention pass.
The lineage-cleanup pass also does
+ // nothing yet because the entry is IN_PROGRESS and not aged enough.
+ _retentionManager.runProcessTable(RETENTION_OFFLINE_TABLE_NAME);
+ List<String> segmentsAfterFirstPass =
getSegments(RETENTION_OFFLINE_TABLE_NAME);
+ assertTrue(segmentsAfterFirstPass.contains(sExpired),
+ "Retention must skip sExpired while it participates in a live lineage
entry");
+ assertFalse(segmentsAfterFirstPass.contains(sExpiredFree),
+ "Retention must still delete the non-lineage-locked expired segment in
the same pass");
+ assertTrue(segmentsAfterFirstPass.contains(sKept));
+ assertLineageEntry(RETENTION_OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS,
+ Collections.singletonList(sExpired), Collections.singletonList(sNew));
+
+ // Complete the replacement. sNew has a fresh end-time so it is not
retention-eligible.
+ addSegmentWithTime(RETENTION_OFFLINE_TABLE_NAME, sNew, nowMs -
TimeUnit.HOURS.toMillis(1L), nowMs);
+ _resourceManager.endReplaceSegments(RETENTION_OFFLINE_TABLE_NAME, entryId,
null);
+ assertLineageEntry(RETENTION_OFFLINE_TABLE_NAME, entryId,
LineageEntryState.COMPLETED,
+ Collections.singletonList(sExpired), Collections.singletonList(sNew));
+ // endReplaceSegments does NOT proactively clean up segmentsFrom; that's
the retention path's responsibility.
+ assertTrue(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sExpired));
+
+ // Run retention again. Now the COMPLETED entry's segmentsFrom is still
lineage-locked w.r.t. time-based
+ // purge, but the lineage-cleanup pass is allowed to delete it (table is
APPEND, so the replaced-segments
+ // retention window is bypassed). The cleanup must go through
deleteSegmentsForLineageCleanup so it isn't
+ // self-blocked by the new check.
+ _retentionManager.runProcessTable(RETENTION_OFFLINE_TABLE_NAME);
+ // IdealState updates inside deleteSegmentsForLineageCleanup are
synchronous (only the deep-store file deletion
+ // is async via SegmentDeletionManager), so we can assert directly.
+ assertFalse(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sExpired),
+ "Expected sExpired to be removed from IdealState by the
lineage-cleanup pass");
+ assertTrue(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sNew));
+ assertTrue(segmentsAfterFirstPass.contains(sKept));
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // T6: Kill switch — deleting a lineage-locked segment is allowed when
+ // controller.lineage.exclusive.delete.enabled=false
+ //
---------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testDeleteAllowedWhenLineageExclusiveDeleteDisabled()
+ throws IOException {
+ String s1 = "t6_s1";
+ String s1New = "t6_s1_new";
+ addSegments(OFFLINE_TABLE_NAME, s1);
+ String entryId = postStartReplaceSegments(OFFLINE_TABLE_NAME,
Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+
+ // PinotHelixResourceManager.isLineageExclusiveDeleteEnabled() reads from
the live ControllerConf on every
+ // delete (no caching), so toggling the property here flips the behavior
for the next REST call. Reset in a
+ // finally block so other tests on this shared singleton are not affected
by the override.
+ ControllerConf conf = TEST_INSTANCE.getControllerConfig();
+ conf.setProperty(ControllerConf.LINEAGE_EXCLUSIVE_DELETE_ENABLED, false);
+ try {
+ sendDeleteSegment(OFFLINE_TABLE_NAME, s1);
+ assertFalse(getSegments(OFFLINE_TABLE_NAME).contains(s1),
+ "Segment must be removed from IdealState when
lineage-exclusive-delete is disabled");
+ // The lineage entry must not be mutated as a side effect of the delete.
+ assertLineageEntry(OFFLINE_TABLE_NAME, entryId,
LineageEntryState.IN_PROGRESS, Collections.singletonList(s1),
+ Collections.singletonList(s1New));
+ } finally {
+ conf.setProperty(ControllerConf.LINEAGE_EXCLUSIVE_DELETE_ENABLED, true);
+ }
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // Helpers
+ //
---------------------------------------------------------------------------------------------------------------
+
+ private void addSegments(String tableNameWithType, String... segmentNames) {
+ for (String name : segmentNames) {
+ _resourceManager.addNewSegment(tableNameWithType,
+ SegmentMetadataMockUtils.mockSegmentMetadata(tableNameWithType,
name), "downloadUrl");
+ }
+ }
+
+ private void addSegmentWithTime(String tableNameWithType, String
segmentName, long startTimeMs, long endTimeMs) {
+ String crc = Long.toString(System.nanoTime());
+ SegmentMetadata metadata =
SegmentMetadataMockUtils.mockSegmentMetadata(tableNameWithType, segmentName,
100, crc,
+ startTimeMs, endTimeMs, TimeUnit.MILLISECONDS);
+ _resourceManager.addNewSegment(tableNameWithType, metadata, "downloadUrl");
+ }
+
+ private List<String> getSegments(String tableNameWithType) {
+ return _resourceManager.getSegmentsFor(tableNameWithType, false);
+ }
+
+ private void assertLineageEntry(String tableNameWithType, String entryId,
LineageEntryState expectedState,
+ List<String> expectedSegmentsFrom, List<String> expectedSegmentsTo) {
+ SegmentLineage lineage =
+
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
tableNameWithType);
+ assertNotNull(lineage, "Expected lineage znode to exist for table: " +
tableNameWithType);
+ LineageEntry entry = lineage.getLineageEntry(entryId);
+ assertNotNull(entry, "Expected lineage entry: " + entryId + " for table: "
+ tableNameWithType);
+ assertEquals(entry.getState(), expectedState,
+ "Unexpected state for lineage entry: " + entryId + " on table: " +
tableNameWithType);
+ assertEquals(entry.getSegmentsFrom(), expectedSegmentsFrom,
+ "Unexpected segmentsFrom for lineage entry: " + entryId);
+ assertEquals(entry.getSegmentsTo(), expectedSegmentsTo,
+ "Unexpected segmentsTo for lineage entry: " + entryId);
+ }
+
+ private String postStartReplaceSegments(String tableNameWithType,
List<String> segmentsFrom,
+ List<String> segmentsTo)
+ throws IOException {
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ String url = _controllerBaseUrl + "/segments/" + rawTableName +
"/startReplaceSegments?type=OFFLINE";
+ String body = JsonUtils.objectToString(new
StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ String response = ControllerTest.sendPostRequest(url, body);
+ JsonNode json = JsonUtils.stringToJsonNode(response);
+ JsonNode entryIdNode = json.get("segmentLineageEntryId");
+ assertNotNull(entryIdNode, "Expected segmentLineageEntryId in response.
Was: " + response);
+ return entryIdNode.asText();
+ }
+
+ private void postEndReplaceSegments(String tableNameWithType, String entryId)
+ throws IOException {
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ String url = _controllerBaseUrl + "/segments/" + rawTableName
+ + "/endReplaceSegments?type=OFFLINE&segmentLineageEntryId=" + entryId;
+ ControllerTest.sendPostRequest(url, "");
+ }
+
+ private void postRevertReplaceSegments(String tableNameWithType, String
entryId, boolean forceRevert)
+ throws IOException {
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ String url = _controllerBaseUrl + "/segments/" + rawTableName
+ + "/revertReplaceSegments?type=OFFLINE&segmentLineageEntryId=" +
entryId + "&forceRevert=" + forceRevert;
+ ControllerTest.sendPostRequest(url, "");
+ }
+
+ private void sendDeleteSegment(String tableNameWithType, String segmentName)
+ throws IOException {
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ String url = _controllerBaseUrl + "/segments/" + rawTableName + "/" +
segmentName;
+ ControllerTest.sendDeleteRequest(url);
+ }
+
+ private void assertStatus(IOException e, int expectedStatus) {
+ String msg = e.getMessage() != null ? e.getMessage() : "";
+ assertTrue(msg.contains("status code: " + expectedStatus) ||
msg.contains("status: " + expectedStatus),
+ "Expected status " + expectedStatus + " but got: " + msg);
+ }
+
+ private void assertNon2xx(IOException e) {
+ String msg = e.getMessage() != null ? e.getMessage() : "";
+ // The HttpClient error message starts with "Got error status code: <N>"
for any non-2xx response.
+ assertTrue(msg.contains("Got error status code"), "Expected a non-2xx HTTP
error but got: " + msg);
+ assertFalse(Arrays.asList("200", "201", "202", "204").stream().anyMatch(
+ code -> msg.contains("status code: " + code + " ")), "Expected non-2xx
response but got: " + msg);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]