61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137544004


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -141,139 +233,116 @@ public void onInstancesChange(Set<String> 
enabledInstances, List<String> changed
    */
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments) {
+    long nowMillis = _clock.millis();
+    onAssignmentChange(idealState, externalView, onlineSegments, nowMillis, 
true);
+  }
+
+  private void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments,
+      long nowMillis, boolean refreshNewSegments) {
+    if (refreshNewSegments) {
+      // If this call is not from init, we use existing state to check whether 
a segment is new.
+      // And we use system clock time as an approximation of segment creation 
time.
+      for (String segment : onlineSegments) {
+        if (!_segmentToOnlineInstancesMap.containsKey(segment) && 
!_newSegmentStates.containsKey(segment)) {
+          _newSegmentStates.put(segment, new SegmentState(nowMillis));
+        }
+      }
+    }
     _segmentToOnlineInstancesMap.clear();
-    _segmentToOfflineInstancesMap.clear();
-    _instanceToSegmentsMap.clear();
 
     // Update the cached maps
-    updateSegmentMaps(idealState, externalView, onlineSegments, 
_segmentToOnlineInstancesMap,
-        _segmentToOfflineInstancesMap, _instanceToSegmentsMap);
+    updateSegmentMaps(idealState, externalView, onlineSegments, 
_segmentToOnlineInstancesMap, _newSegmentStates,
+        nowMillis);
 
-    // Generate a new map from segment to enabled ONLINE/CONSUMING instances 
and a new set of unavailable segments (no
-    // enabled instance or all enabled instances are in ERROR state)
-    Map<String, List<String>> segmentToEnabledInstancesMap =
-        new 
HashMap<>(HashUtil.getHashMapCapacity(_segmentToOnlineInstancesMap.size()));
-    Set<String> unavailableSegments = new HashSet<>();
-    // NOTE: Put null as the value when there is no enabled instances for a 
segment so that segmentToEnabledInstancesMap
-    // always contains all segments. With this, in onInstancesChange() we can 
directly iterate over
-    // segmentToEnabledInstancesMap.entrySet() and modify the value without 
changing the map entries.
-    for (Map.Entry<String, List<String>> entry : 
_segmentToOnlineInstancesMap.entrySet()) {
-      String segment = entry.getKey();
-      List<String> enabledInstancesForSegment =
-          calculateEnabledInstancesForSegment(segment, entry.getValue(), 
unavailableSegments);
-      segmentToEnabledInstancesMap.put(segment, enabledInstancesForSegment);
-    }
-
-    _segmentToEnabledInstancesMap = segmentToEnabledInstancesMap;
-    _unavailableSegments = unavailableSegments;
+    _segmentStateSnapshot =
+        SegmentStateSnapshot.createSnapshot(_tableNameWithType, 
_segmentToOnlineInstancesMap, _newSegmentStates,
+            _enabledInstances, _brokerMetrics);
   }
 
   /**
    * Updates the segment maps based on the given ideal state, external view 
and online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@link SegmentPreSelector}).
    */
-  void updateSegmentMaps(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments,
-      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, 
List<String>> segmentToOfflineInstancesMap,
-      Map<String, List<String>> instanceToSegmentsMap) {
-    // Iterate over the external view instead of the online segments so that 
the map lookups are performed on the
-    // HashSet instead of the TreeSet for performance
-    // NOTE: Do not track segments not in the external view because it is a 
valid state when the segment is new added
-    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
-    for (Map.Entry<String, Map<String, String>> entry : 
externalView.getRecord().getMapFields().entrySet()) {
+  protected void updateSegmentMaps(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, 
SegmentState> newSegmentStateMap,
+      long nowMillis) {
+    // NOTE: Segments with missing external view are considered as new.
+    Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
+    // Iterate over the ideal state instead of the external view since this 
will cover segment with missing external
+    // view.
+    for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
       String segment = entry.getKey();
-
       // Only track online segments
       if (!onlineSegments.contains(segment)) {
         continue;
       }
-
-      Map<String, String> externalViewInstanceStateMap = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      List<String> onlineInstances = new 
ArrayList<>(externalViewInstanceStateMap.size());
-      List<String> offlineInstances = new ArrayList<>();
-      segmentToOnlineInstancesMap.put(segment, onlineInstances);
-      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      Map<String, String> idealStateInstanceStateMap = entry.getValue();
+      Map<String, String> externalViewInstanceStateMap =
+          externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+      // Sort the online instances for replica-group routing to work. For 
multiple segments with the same online
+      // instances, if the list is sorted, the same index in the list will 
always point to the same instance.
+      Set<String> onlineInstances = new TreeSet<>();
       for (Map.Entry<String, String> instanceStateEntry : 
externalViewInstanceStateMap.entrySet()) {
         String instance = instanceStateEntry.getKey();
-
         // Only track instances within the ideal state
         // NOTE: When an instance is not in the ideal state, the instance will 
drop the segment soon, and it is not safe
         // to query this instance for the segment. This could happen when a 
segment is moved from one instance to
         // another instance.
         if (!idealStateInstanceStateMap.containsKey(instance)) {
           continue;
         }
-
         String externalViewState = instanceStateEntry.getValue();
         // Do not track instances in ERROR state
         if (!externalViewState.equals(SegmentStateModel.ERROR)) {
-          instanceToSegmentsMap.computeIfAbsent(instance, k -> new 
ArrayList<>()).add(segment);
-          if (externalViewState.equals(SegmentStateModel.OFFLINE)) {
-            offlineInstances.add(instance);
-          } else {
+          if (SegmentStateModel.isOnline(externalViewState)) {
             onlineInstances.add(instance);
           }
+        } else {
+          // Segment with error state instance should be considered old.
+          newSegmentStateMap.remove(segment);
         }
       }
-
-      // Sort the online instances for replica-group routing to work. For 
multiple segments with the same online
-      // instances, if the list is sorted, the same index in the list will 
always point to the same instance.
-      if (!(externalViewInstanceStateMap instanceof SortedMap)) {
-        onlineInstances.sort(null);
-        offlineInstances.sort(null);
-      }
-    }
-  }
-
-  /**
-   * Calculates the enabled ONLINE/CONSUMING instances for the given segment, 
and updates the unavailable segments (no
-   * enabled instance or all enabled instances are in ERROR state).
-   */
-  @Nullable
-  private List<String> calculateEnabledInstancesForSegment(String segment, 
List<String> onlineInstancesForSegment,
-      Set<String> unavailableSegments) {
-    List<String> enabledInstancesForSegment = new 
ArrayList<>(onlineInstancesForSegment.size());
-    for (String onlineInstance : onlineInstancesForSegment) {
-      if (_enabledInstances.contains(onlineInstance)) {
-        enabledInstancesForSegment.add(onlineInstance);
+      SegmentState state = newSegmentStateMap.get(segment);

Review Comment:
   this could be old segment too when it gets retired by clock



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment 
to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no 
enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and 
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first 
update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, 
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the 
new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, 
nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change 
processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so 
make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), 
so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;

Review Comment:
   this will be a followup PR as discussed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to