npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548249903



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", 
tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() 
- startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions

Review comment:
       Not sure what you mean. This is just a safeguard against table being 
deleted in between




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to