This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new df584bed7c5 resume partially offline replicas in controller validation 
(#17754)
df584bed7c5 is described below

commit df584bed7c572a2be0aa59346c3cb46b66b7cc31
Author: rohit <[email protected]>
AuthorDate: Fri Mar 6 03:26:29 2026 +0530

    resume partially offline replicas in controller validation (#17754)
---
 .../apache/pinot/controller/ControllerConf.java    |  6 ++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 23 ++++++
 .../PinotLLCRealtimeSegmentManagerTest.java        | 86 ++++++++++++++++++++++
 3 files changed, 115 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 308b3062f58..eedfba2bd7d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -253,6 +253,8 @@ public class ControllerConf extends PinotConfiguration {
         "controller.segment.level.validation.intervalPeriod";
     public static final String AUTO_RESET_ERROR_SEGMENTS_VALIDATION =
         "controller.segment.error.autoReset";
+    public static final String ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR =
+        "controller.realtime.segment.partialOfflineReplicaRepairEnabled";
     public static final String DISASTER_RECOVERY_MODE_CONFIG_KEY = 
"controller.segment.disaster.recovery.mode";
 
     // Initial delays
@@ -1181,6 +1183,10 @@ public class ControllerConf extends PinotConfiguration {
     return 
getProperty(ControllerPeriodicTasksConf.AUTO_RESET_ERROR_SEGMENTS_VALIDATION, 
true);
   }
 
+  public boolean isPartialOfflineReplicaRepairEnabled() {
+    return 
getProperty(ControllerPeriodicTasksConf.ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR, 
false);
+  }
+
   public DisasterRecoveryMode getDisasterRecoveryMode() {
     return 
getDisasterRecoveryMode(getProperty(ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY));
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 3e4f2754e5e..9ed39b67c46 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -213,6 +213,7 @@ public class PinotLLCRealtimeSegmentManager {
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
   private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
   private final boolean _isTmpSegmentAsyncDeletionEnabled;
+  private final boolean _isPartialOfflineReplicaRepairEnabled;
   private final int _deepstoreUploadRetryTimeoutMs;
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
@@ -241,6 +242,7 @@ public class PinotLLCRealtimeSegmentManager {
     _deepStoreUploadExecutorPendingSegments = ConcurrentHashMap.newKeySet();
 
     _isTmpSegmentAsyncDeletionEnabled = 
controllerConf.isTmpSegmentAsyncDeletionEnabled();
+    _isPartialOfflineReplicaRepairEnabled = 
controllerConf.isPartialOfflineReplicaRepairEnabled();
     _deepstoreUploadRetryTimeoutMs = 
controllerConf.getDeepStoreRetryUploadTimeoutMs();
   }
 
@@ -1803,6 +1805,27 @@ public class PinotLLCRealtimeSegmentManager {
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
latestSegmentName, null, segmentAssignment,
                   instancePartitionsMap);
             }
+          } else if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS
+              && _isPartialOfflineReplicaRepairEnabled) {
+            // Handle case where some replicas are OFFLINE while others are 
CONSUMING
+            // This happens when one replica fails (e.g., KafkaConsumer init 
error) and marks itself OFFLINE
+            // while other replicas continue consuming
+            List<String> offlineInstances = new ArrayList<>();
+            for (Map.Entry<String, String> instanceEntry : 
instanceStateMap.entrySet()) {
+              if (SegmentStateModel.OFFLINE.equals(instanceEntry.getValue())) {
+                offlineInstances.add(instanceEntry.getKey());
+              }
+            }
+
+            if (!offlineInstances.isEmpty()) {
+              LOGGER.info("Repairing segment: {} with {} OFFLINE replicas out 
of {} total replicas. "
+                      + "Setting OFFLINE replicas back to CONSUMING: {}", 
latestSegmentName, offlineInstances.size(),
+                  instanceStateMap.size(), offlineInstances);
+              // Set the OFFLINE replicas back to CONSUMING so they can retry
+              for (String offlineInstance : offlineInstances) {
+                instanceStateMap.put(offlineInstance, 
SegmentStateModel.CONSUMING);
+              }
+            }
           }
           // else, the metadata should be IN_PROGRESS, which is the right 
state for a consuming segment.
         } else {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 6f05cd437e4..13a15c045e4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -878,6 +878,77 @@ public class PinotLLCRealtimeSegmentManagerTest {
     testRepairs(segmentManager, Lists.newArrayList(1));
   }
 
+  @Test
+  public void testPartialOfflineReplicaRepair() {
+    // Set up a new table with 3 replicas, 5 instances, 2 partitions
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager, true);
+    setUpNewTable(segmentManager, 3, 5, 2);
+    Map<String, Map<String, String>> instanceStatesMap = 
segmentManager._idealState.getRecord().getMapFields();
+
+    // Turn one replica OFFLINE for the CONSUMING segment in partition group 0 
(simulating issue #11314)
+    String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    Map<String, String> consumingSegmentInstanceStateMap = 
instanceStatesMap.get(consumingSegment);
+    assertNotNull(consumingSegmentInstanceStateMap);
+    assertEquals(consumingSegmentInstanceStateMap.size(), 3);
+
+    // Find the first instance and mark it OFFLINE
+    String offlineInstance = 
consumingSegmentInstanceStateMap.keySet().iterator().next();
+    assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance), 
SegmentStateModel.CONSUMING);
+    consumingSegmentInstanceStateMap.put(offlineInstance, 
SegmentStateModel.OFFLINE);
+
+    // Verify we have mixed state: 2 CONSUMING, 1 OFFLINE
+    long consumingCount = consumingSegmentInstanceStateMap.values().stream()
+        .filter(s -> s.equals(SegmentStateModel.CONSUMING)).count();
+    long offlineCount = consumingSegmentInstanceStateMap.values().stream()
+        .filter(s -> s.equals(SegmentStateModel.OFFLINE)).count();
+    assertEquals(consumingCount, 2);
+    assertEquals(offlineCount, 1);
+
+    // Run repair - should set the OFFLINE replica back to CONSUMING
+    segmentManager._exceededMaxSegmentCompletionTime = true;
+    segmentManager.ensureAllPartitionsConsuming();
+
+    // Verify all replicas are now CONSUMING
+    consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+    assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
+        Collections.singleton(SegmentStateModel.CONSUMING));
+    assertEquals(consumingSegmentInstanceStateMap.size(), 3);
+    assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance), 
SegmentStateModel.CONSUMING);
+  }
+
+  @Test
+  public void testPartialOfflineReplicaRepairDisabled() {
+    // Set up a new table with 3 replicas, 5 instances, 2 partitions
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    // Create segment manager with partial repair DISABLED
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager, 
false);
+    setUpNewTable(segmentManager, 3, 5, 2);
+    Map<String, Map<String, String>> instanceStatesMap = 
segmentManager._idealState.getRecord().getMapFields();
+
+    // Turn one replica OFFLINE for the CONSUMING segment in partition group 0
+    String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    Map<String, String> consumingSegmentInstanceStateMap = 
instanceStatesMap.get(consumingSegment);
+    assertNotNull(consumingSegmentInstanceStateMap);
+
+    String offlineInstance = 
consumingSegmentInstanceStateMap.keySet().iterator().next();
+    consumingSegmentInstanceStateMap.put(offlineInstance, 
SegmentStateModel.OFFLINE);
+
+    // Store old state
+    Map<String, Map<String, String>> oldInstanceStatesMap = 
cloneInstanceStatesMap(instanceStatesMap);
+
+    // Run repair - should NOT change anything since the feature is disabled
+    segmentManager._exceededMaxSegmentCompletionTime = true;
+    segmentManager.ensureAllPartitionsConsuming();
+
+    // Verify the OFFLINE replica is still OFFLINE (no repair)
+    consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+    assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance), 
SegmentStateModel.OFFLINE);
+    assertEquals(oldInstanceStatesMap.get(consumingSegment), 
consumingSegmentInstanceStateMap);
+  }
+
   /**
    * Removes the new CONSUMING segment and sets the latest committed (ONLINE) 
segment to CONSUMING if exists in the
    * ideal state.
@@ -2127,6 +2198,21 @@ public class PinotLLCRealtimeSegmentManagerTest {
       _mockResourceManager = pinotHelixResourceManager;
     }
 
+    FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager 
pinotHelixResourceManager,
+        boolean enablePartialOfflineReplicaRepair) {
+      super(pinotHelixResourceManager, 
createControllerConf(enablePartialOfflineReplicaRepair),
+          mock(ControllerMetrics.class));
+      _mockResourceManager = pinotHelixResourceManager;
+    }
+
+    private static ControllerConf createControllerConf(boolean 
enablePartialOfflineReplicaRepair) {
+      ControllerConf config = new ControllerConf();
+      config.setDataDir(TEMP_DIR.toString());
+      
config.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR,
+          enablePartialOfflineReplicaRepair);
+      return config;
+    }
+
     private static PinotHelixResourceManager createMockedResourceManager() {
       PinotHelixResourceManager mockResourceManager = 
mock(PinotHelixResourceManager.class);
       HelixManager mockHelixManager = mock(HelixManager.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to