This is an automated email from the ASF dual-hosted git repository.

KKcorps pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fdd8762953f Controller cleanups: SegmentDeletionManager fallback, LLC 
filter, retention strategy extraction, mock util (#18582)
fdd8762953f is described below

commit fdd8762953f8b9ee6db94f5deafe0020f1de3d9f
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu May 28 17:39:43 2026 +0530

    Controller cleanups: SegmentDeletionManager fallback, LLC filter, retention 
strategy extraction, mock util (#18582)
---
 .../helix/core/PinotHelixResourceManager.java      | 14 +++-
 .../helix/core/SegmentDeletionManager.java         | 11 +++-
 .../helix/core/retention/RetentionManager.java     | 12 +---
 .../core/retention/TableConfigRetentionUtils.java  | 63 ++++++++++++++++++
 ...notHelixResourceManagerLastLLCSegmentsTest.java | 77 ++++++++++++++++++++++
 .../helix/core/retention/RetentionManagerTest.java |  1 +
 .../controller/utils/SegmentMetadataMockUtils.java | 14 ++++
 7 files changed, 179 insertions(+), 13 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d5cb2b1d97f..909084468d4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1051,10 +1051,22 @@ public class PinotHelixResourceManager {
   }
 
   public Collection<String> getLastLLCCompletedSegments(String 
tableNameWithType) {
+    return 
getLastLLCCompletedSegments(getSegmentsZKMetadata(tableNameWithType));
+  }
+
+  /// Overload that operates on a caller-supplied list of {@link 
SegmentZKMetadata}, avoiding a
+  /// redundant ZK fetch when the caller already holds the list (e.g. periodic 
tasks that scan all
+  /// segments of a table and want to derive the last-completed LLC segment 
per partition without
+  /// re-reading the property store).
+  public Collection<String> getLastLLCCompletedSegments(List<? extends 
SegmentZKMetadata> segmentZKMetadataList) {
     Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new 
HashMap<>();
-    for (SegmentZKMetadata zkMetadata : 
getSegmentsZKMetadata(tableNameWithType)) {
+    for (SegmentZKMetadata zkMetadata : segmentZKMetadataList) {
       if (zkMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
         LLCSegmentName llcName = 
LLCSegmentName.of(zkMetadata.getSegmentName());
+        if (llcName == null) {
+          // llcName can be null if the segment is uploaded through offline 
ingestion
+          continue;
+        }
         int partitionGroupId = llcName.getPartitionGroupId();
         int sequenceNumber = llcName.getSequenceNumber();
         String lastCompletedSegName = 
partitionIdToLastLLCCompletedSegmentMap.get(partitionGroupId);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 8c4c7c4b484..3d5680fae37 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -235,9 +235,14 @@ public class SegmentDeletionManager {
       for (int i = 0; i < deleteSuccessful.length; i++) {
         final String segmentId = segmentsToDelete.get(i);
         if (!deleteSuccessful[i]) {
-          // remove API can fail because the prop store entry did not exist, 
so check first.
-          if (_propertyStore.exists(propStorePathList.get(i), 
AccessOption.PERSISTENT)) {
-            LOGGER.info("Could not delete {} from propertystore", 
propStorePathList.get(i));
+          // The batch remove API takes a non-recursive ZK path: it cannot 
delete a znode that has
+          // accumulated children. Fall back to the single-path remove API, 
which falls back to a
+          // recursive delete on the same NotEmpty failure. Skip when the 
znode is already gone
+          // (the batch call may have failed simply because the entry did not 
exist).
+          String segmentPath = propStorePathList.get(i);
+          if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
+              && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) 
{
+            LOGGER.info("Could not delete {} from propertystore", segmentPath);
             segmentsToRetryLater.add(segmentId);
             propStoreFailedSegs.add(segmentId);
           }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 82cd4076128..9e6c3bbd8f6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -142,19 +142,13 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
       LOGGER.info("Segment push type is not APPEND for table: {}, skip 
managing retention", tableNameWithType);
       return;
     }
-    String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
-    String retentionTimeValue = validationConfig.getRetentionTimeValue();
     int untrackedSegmentsDeletionBatchSize =
         validationConfig.getUntrackedSegmentsDeletionBatchSize() != null ? 
Integer.parseInt(
             validationConfig.getUntrackedSegmentsDeletionBatchSize()) : 
DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE;
 
-    RetentionStrategy retentionStrategy;
-    try {
-      retentionStrategy = new 
TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
-          Long.parseLong(retentionTimeValue), 
_useCreationTimeFallbackForRetention);
-    } catch (Exception e) {
-      LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", 
retentionTimeUnit, retentionTimeValue,
-          tableNameWithType);
+    RetentionStrategy retentionStrategy =
+        TableConfigRetentionUtils.buildRetentionStrategy(tableConfig, 
_useCreationTimeFallbackForRetention);
+    if (retentionStrategy == null) {
       return;
     }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/TableConfigRetentionUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/TableConfigRetentionUtils.java
new file mode 100644
index 00000000000..9df3bc81438
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/TableConfigRetentionUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.retention;
+
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import 
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
+import 
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Utility methods for deriving retention-related objects from a {@link 
TableConfig}.
+public class TableConfigRetentionUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableConfigRetentionUtils.class);
+
+  private TableConfigRetentionUtils() {
+  }
+
+  /// Builds a {@link RetentionStrategy} from {@code tableConfig}, or returns 
{@code null} when the
+  /// retention config is absent, empty, or malformed. A null return means no 
retention is configured
+  /// and no segment should be treated as purgeable.
+  ///
+  /// @param useCreationTimeFallback when true, the strategy falls back to 
segment creation time
+  ///                                when segment end time is unavailable
+  @Nullable
+  public static RetentionStrategy buildRetentionStrategy(TableConfig 
tableConfig,
+      boolean useCreationTimeFallback) {
+    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+    String unit = validationConfig.getRetentionTimeUnit();
+    String value = validationConfig.getRetentionTimeValue();
+    if (unit == null || unit.isEmpty() || value == null || value.isEmpty()) {
+      LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", unit, 
value, tableConfig.getTableName());
+      return null;
+    }
+    try {
+      return new 
TimeRetentionStrategy(TimeUnit.valueOf(unit.toUpperCase(Locale.ROOT)), 
Long.parseLong(value),
+          useCreationTimeFallback);
+    } catch (Exception e) {
+      LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", unit, 
value, tableConfig.getTableName());
+      return null;
+    }
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerLastLLCSegmentsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerLastLLCSegmentsTest.java
new file mode 100644
index 00000000000..0feee5a6c25
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerLastLLCSegmentsTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+
+public class PinotHelixResourceManagerLastLLCSegmentsTest {
+
+  private static final String TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+
+  /**
+   * A realtime table can contain non-LLC-named segments (e.g. uploaded via 
batch ingestion) sitting in DONE state
+   * alongside the LLC-named consuming/committed segments. {@code 
getLastLLCCompletedSegments} must skip those
+   * uploaded segments rather than NPE when {@code LLCSegmentName.of(name)} 
returns {@code null}.
+   */
+  @Test
+  public void testGetLastLLCCompletedSegmentsSkipsNonLLCNamedSegments() {
+    long now = System.currentTimeMillis();
+    int partitionId = 3;
+
+    List<SegmentZKMetadata> segments = new ArrayList<>();
+    // Two LLC-named DONE segments — sequence 0 and 1 for the same partition; 
sequence 1 is the latest.
+    LLCSegmentName seq0 = new LLCSegmentName(TABLE_NAME, partitionId, 0, now);
+    LLCSegmentName seq1 = new LLCSegmentName(TABLE_NAME, partitionId, 1, now);
+    segments.add(doneSegment(seq0.getSegmentName()));
+    segments.add(doneSegment(seq1.getSegmentName()));
+    // An uploaded (non-LLC-named) segment in DONE state — must be ignored, 
not crash the method.
+    segments.add(doneSegment("uploaded_segment_0"));
+
+    PinotHelixResourceManager rm = mock(PinotHelixResourceManager.class);
+    when(rm.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segments);
+    
when(rm.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
+    when(rm.getLastLLCCompletedSegments(anyList())).thenCallRealMethod();
+
+    Collection<String> lastCompleted = 
rm.getLastLLCCompletedSegments(REALTIME_TABLE_NAME);
+    Set<String> actual = new HashSet<>(lastCompleted);
+    assertEquals(actual, Set.of(seq1.getSegmentName()));
+  }
+
+  private static SegmentZKMetadata doneSegment(String name) {
+    SegmentZKMetadata md = new SegmentZKMetadata(name);
+    md.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    return md;
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 24fde5ae5b8..c32459615db 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -808,6 +808,7 @@ public class RetentionManagerTest {
     
when(pinotHelixResourceManager.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segmentsZKMetadata);
     
when(pinotHelixResourceManager.getHelixClusterName()).thenReturn(HELIX_CLUSTER_NAME);
     
when(pinotHelixResourceManager.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
+    
when(pinotHelixResourceManager.getLastLLCCompletedSegments(anyList())).thenCallRealMethod();
 
     HelixAdmin helixAdmin = mock(HelixAdmin.class);
     when(helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, 
REALTIME_TABLE_NAME)).thenReturn(idealState);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 99b5b23b7db..7a4f2c5d622 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -84,6 +84,20 @@ public class SegmentMetadataMockUtils {
     return segmentZKMetadata;
   }
 
+  public static SegmentMetadata mockSegmentMetadata(String tableName, String 
segmentName, int numTotalDocs,
+      String crc, long startTime, long endTime, TimeUnit timeUnit, String 
partitionColumn, int partitionId,
+      int numPartitions) {
+    SegmentMetadata segmentMetadata =
+        mockSegmentMetadata(tableName, segmentName, numTotalDocs, crc, 
startTime, endTime, timeUnit);
+    ColumnMetadata colMeta = mock(ColumnMetadata.class);
+    
when(colMeta.getPartitions()).thenReturn(Collections.singleton(partitionId));
+    when(colMeta.getPartitionFunction()).thenReturn(new 
MurmurPartitionFunction(numPartitions, null));
+    TreeMap<String, ColumnMetadata> columnMetadataMap = new TreeMap<>();
+    columnMetadataMap.put(partitionColumn, colMeta);
+    when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap);
+    return segmentMetadata;
+  }
+
   public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String 
rawTableName, String segmentName,
       String columnName, int partitionNumber) {
     ColumnMetadata columnMetadata = mock(ColumnMetadata.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to