This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new df584bed7c5 resume partially offline replicas in controller validation
(#17754)
df584bed7c5 is described below
commit df584bed7c572a2be0aa59346c3cb46b66b7cc31
Author: rohit <[email protected]>
AuthorDate: Fri Mar 6 03:26:29 2026 +0530
resume partially offline replicas in controller validation (#17754)
---
.../apache/pinot/controller/ControllerConf.java | 6 ++
.../realtime/PinotLLCRealtimeSegmentManager.java | 23 ++++++
.../PinotLLCRealtimeSegmentManagerTest.java | 86 ++++++++++++++++++++++
3 files changed, 115 insertions(+)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 308b3062f58..eedfba2bd7d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -253,6 +253,8 @@ public class ControllerConf extends PinotConfiguration {
"controller.segment.level.validation.intervalPeriod";
public static final String AUTO_RESET_ERROR_SEGMENTS_VALIDATION =
"controller.segment.error.autoReset";
+ public static final String ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR =
+ "controller.realtime.segment.partialOfflineReplicaRepairEnabled";
public static final String DISASTER_RECOVERY_MODE_CONFIG_KEY =
"controller.segment.disaster.recovery.mode";
// Initial delays
@@ -1181,6 +1183,10 @@ public class ControllerConf extends PinotConfiguration {
return
getProperty(ControllerPeriodicTasksConf.AUTO_RESET_ERROR_SEGMENTS_VALIDATION,
true);
}
+ public boolean isPartialOfflineReplicaRepairEnabled() {
+ return
getProperty(ControllerPeriodicTasksConf.ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR,
false);
+ }
+
public DisasterRecoveryMode getDisasterRecoveryMode() {
return
getDisasterRecoveryMode(getProperty(ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 3e4f2754e5e..9ed39b67c46 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -213,6 +213,7 @@ public class PinotLLCRealtimeSegmentManager {
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
private final boolean _isTmpSegmentAsyncDeletionEnabled;
+ private final boolean _isPartialOfflineReplicaRepairEnabled;
private final int _deepstoreUploadRetryTimeoutMs;
private final FileUploadDownloadClient _fileUploadDownloadClient;
private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
@@ -241,6 +242,7 @@ public class PinotLLCRealtimeSegmentManager {
_deepStoreUploadExecutorPendingSegments = ConcurrentHashMap.newKeySet();
_isTmpSegmentAsyncDeletionEnabled =
controllerConf.isTmpSegmentAsyncDeletionEnabled();
+ _isPartialOfflineReplicaRepairEnabled =
controllerConf.isPartialOfflineReplicaRepairEnabled();
_deepstoreUploadRetryTimeoutMs =
controllerConf.getDeepStoreRetryUploadTimeoutMs();
}
@@ -1803,6 +1805,27 @@ public class PinotLLCRealtimeSegmentManager {
updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, null, segmentAssignment,
instancePartitionsMap);
}
+ } else if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS
+ && _isPartialOfflineReplicaRepairEnabled) {
+ // Handle case where some replicas are OFFLINE while others are
CONSUMING
+ // This happens when one replica fails (e.g., KafkaConsumer init
error) and marks itself OFFLINE
+ // while other replicas continue consuming
+ List<String> offlineInstances = new ArrayList<>();
+ for (Map.Entry<String, String> instanceEntry :
instanceStateMap.entrySet()) {
+ if (SegmentStateModel.OFFLINE.equals(instanceEntry.getValue())) {
+ offlineInstances.add(instanceEntry.getKey());
+ }
+ }
+
+ if (!offlineInstances.isEmpty()) {
+ LOGGER.info("Repairing segment: {} with {} OFFLINE replicas out
of {} total replicas. "
+ + "Setting OFFLINE replicas back to CONSUMING: {}",
latestSegmentName, offlineInstances.size(),
+ instanceStateMap.size(), offlineInstances);
+ // Set the OFFLINE replicas back to CONSUMING so they can retry
+ for (String offlineInstance : offlineInstances) {
+ instanceStateMap.put(offlineInstance,
SegmentStateModel.CONSUMING);
+ }
+ }
}
// else, the metadata should be IN_PROGRESS, which is the right
state for a consuming segment.
} else {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 6f05cd437e4..13a15c045e4 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -878,6 +878,77 @@ public class PinotLLCRealtimeSegmentManagerTest {
testRepairs(segmentManager, Lists.newArrayList(1));
}
+ @Test
+ public void testPartialOfflineReplicaRepair() {
+ // Set up a new table with 3 replicas, 5 instances, 2 partitions
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager, true);
+ setUpNewTable(segmentManager, 3, 5, 2);
+ Map<String, Map<String, String>> instanceStatesMap =
segmentManager._idealState.getRecord().getMapFields();
+
+ // Turn one replica OFFLINE for the CONSUMING segment in partition group 0
(simulating issue #11314)
+ String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ Map<String, String> consumingSegmentInstanceStateMap =
instanceStatesMap.get(consumingSegment);
+ assertNotNull(consumingSegmentInstanceStateMap);
+ assertEquals(consumingSegmentInstanceStateMap.size(), 3);
+
+ // Find the first instance and mark it OFFLINE
+ String offlineInstance =
consumingSegmentInstanceStateMap.keySet().iterator().next();
+ assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance),
SegmentStateModel.CONSUMING);
+ consumingSegmentInstanceStateMap.put(offlineInstance,
SegmentStateModel.OFFLINE);
+
+ // Verify we have mixed state: 2 CONSUMING, 1 OFFLINE
+ long consumingCount = consumingSegmentInstanceStateMap.values().stream()
+ .filter(s -> s.equals(SegmentStateModel.CONSUMING)).count();
+ long offlineCount = consumingSegmentInstanceStateMap.values().stream()
+ .filter(s -> s.equals(SegmentStateModel.OFFLINE)).count();
+ assertEquals(consumingCount, 2);
+ assertEquals(offlineCount, 1);
+
+ // Run repair - should set the OFFLINE replica back to CONSUMING
+ segmentManager._exceededMaxSegmentCompletionTime = true;
+ segmentManager.ensureAllPartitionsConsuming();
+
+ // Verify all replicas are now CONSUMING
+ consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+ assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
+ Collections.singleton(SegmentStateModel.CONSUMING));
+ assertEquals(consumingSegmentInstanceStateMap.size(), 3);
+ assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance),
SegmentStateModel.CONSUMING);
+ }
+
+ @Test
+ public void testPartialOfflineReplicaRepairDisabled() {
+ // Set up a new table with 3 replicas, 5 instances, 2 partitions
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ // Create segment manager with partial repair DISABLED
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager,
false);
+ setUpNewTable(segmentManager, 3, 5, 2);
+ Map<String, Map<String, String>> instanceStatesMap =
segmentManager._idealState.getRecord().getMapFields();
+
+ // Turn one replica OFFLINE for the CONSUMING segment in partition group 0
+ String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ Map<String, String> consumingSegmentInstanceStateMap =
instanceStatesMap.get(consumingSegment);
+ assertNotNull(consumingSegmentInstanceStateMap);
+
+ String offlineInstance =
consumingSegmentInstanceStateMap.keySet().iterator().next();
+ consumingSegmentInstanceStateMap.put(offlineInstance,
SegmentStateModel.OFFLINE);
+
+ // Store old state
+ Map<String, Map<String, String>> oldInstanceStatesMap =
cloneInstanceStatesMap(instanceStatesMap);
+
+ // Run repair - should NOT change anything since the feature is disabled
+ segmentManager._exceededMaxSegmentCompletionTime = true;
+ segmentManager.ensureAllPartitionsConsuming();
+
+ // Verify the OFFLINE replica is still OFFLINE (no repair)
+ consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+ assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance),
SegmentStateModel.OFFLINE);
+ assertEquals(oldInstanceStatesMap.get(consumingSegment),
consumingSegmentInstanceStateMap);
+ }
+
/**
* Removes the new CONSUMING segment and sets the latest committed (ONLINE)
segment to CONSUMING if exists in the
* ideal state.
@@ -2127,6 +2198,21 @@ public class PinotLLCRealtimeSegmentManagerTest {
_mockResourceManager = pinotHelixResourceManager;
}
+ FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager
pinotHelixResourceManager,
+ boolean enablePartialOfflineReplicaRepair) {
+ super(pinotHelixResourceManager,
createControllerConf(enablePartialOfflineReplicaRepair),
+ mock(ControllerMetrics.class));
+ _mockResourceManager = pinotHelixResourceManager;
+ }
+
+ private static ControllerConf createControllerConf(boolean
enablePartialOfflineReplicaRepair) {
+ ControllerConf config = new ControllerConf();
+ config.setDataDir(TEMP_DIR.toString());
+
config.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR,
+ enablePartialOfflineReplicaRepair);
+ return config;
+ }
+
private static PinotHelixResourceManager createMockedResourceManager() {
PinotHelixResourceManager mockResourceManager =
mock(PinotHelixResourceManager.class);
HelixManager mockHelixManager = mock(HelixManager.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]