61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137543416
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Base implementation of instance selector which maintains a map from segment
to enabled ONLINE/CONSUMING server
* instances that serves the segment and a set of unavailable segments (no
enabled instance or all enabled instances are
* in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first
update of ideal state for that segment as
+ * approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available,
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the
new segments that expire with the clock
+ * are still considered as old.
*/
abstract class BaseInstanceSelector implements InstanceSelector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(BaseInstanceSelector.class);
+ public static class SegmentState {
+ // List of instance for this segment in ideal state.
+ // Mapping from candidate to index in _candidateInstance.
+ private HashMap<String, Boolean> _candidates;
+
+ private long _creationMillis;
+
+ public SegmentState(long creationMillis) {
+ _creationMillis = creationMillis;
+ _candidates = new HashMap<>();
+ }
+
+ public boolean isNew(long nowMillis) {
+ return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis,
nowMillis);
+ }
+
+ public void resetCandidates() {
+ _candidates.clear();
+ }
+
+ public void addCandidate(String candidate, boolean online) {
+ _candidates.put(candidate, online);
+ }
+
+ public HashMap<String, Boolean> getCandidates() {
+ return _candidates;
+ }
+ }
// To prevent int overflow, reset the request id once it reaches this value
private static final long MAX_REQUEST_ID = 1_000_000_000;
- private final String _tableNameWithType;
private final BrokerMetrics _brokerMetrics;
+ private final String _tableNameWithType;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected final AdaptiveServerSelector _adaptiveServerSelector;
// These 4 variables are the cached states to help accelerate the change
processing
private Set<String> _enabledInstances;
private Map<String, List<String>> _segmentToOnlineInstancesMap;
- private Map<String, List<String>> _segmentToOfflineInstancesMap;
private Map<String, List<String>> _instanceToSegmentsMap;
+ private Map<String, SegmentState> _newSegmentStates;
- // These 2 variables are needed for instance selection (multi-threaded), so
make them volatile
- private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
- private volatile Set<String> _unavailableSegments;
+ // _segmentStateSnapshot is needed for instance selection (multi-threaded),
so make them volatile
+ private volatile SegmentStateSnapshot _segmentStateSnapshot;
+ private Clock _clock;
BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
- @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+ @Nullable AdaptiveServerSelector adaptiveServerSelector,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ this(tableNameWithType, brokerMetrics, adaptiveServerSelector,
propertyStore, Clock.systemUTC());
+ }
+
+ // Test only for clock injection.
+ BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+ @Nullable AdaptiveServerSelector adaptiveServerSelector,
ZkHelixPropertyStore<ZNRecord> propertyStore,
+ Clock clock) {
_tableNameWithType = tableNameWithType;
_brokerMetrics = brokerMetrics;
_adaptiveServerSelector = adaptiveServerSelector;
+ _newSegmentStates = new HashMap<>();
+ _propertyStore = propertyStore;
+ _clock = clock;
+ }
+
+ // Get the segment where the ideal state hasn't converged with external view
and which doesn't have error instance.
+ private static List<String> getPotentialNewSegments(IdealState idealState,
ExternalView externalView,
+ Set<String> onlineSegments) {
+ List<String> potentialNewSegments = new ArrayList<>();
+ Map<String, Map<String, String>> externalViewAssignment =
externalView.getRecord().getMapFields();
+ for (Map.Entry<String, Map<String, String>> entry :
idealState.getRecord().getMapFields().entrySet()) {
+ String segment = entry.getKey();
+ // Only track online segments
+ if (!onlineSegments.contains(segment)) {
+ continue;
+ }
+ Map<String, String> idealStateInstanceStateMap = entry.getValue();
+ // Segments with missing external view are considered as new.
+ Map<String, String> externalViewInstanceStateMap =
+ externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+ List<String> onlineInstance = new ArrayList<>();
+ boolean couldBeNewSegment = true;
Review Comment:
updated the logic. PTAL
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Base implementation of instance selector which maintains a map from segment
to enabled ONLINE/CONSUMING server
* instances that serves the segment and a set of unavailable segments (no
enabled instance or all enabled instances are
* in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first
update of ideal state for that segment as
+ * approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available,
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the
new segments that expire with the clock
+ * are still considered as old.
*/
abstract class BaseInstanceSelector implements InstanceSelector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(BaseInstanceSelector.class);
+ public static class SegmentState {
+ // List of instance for this segment in ideal state.
+ // Mapping from candidate to index in _candidateInstance.
+ private HashMap<String, Boolean> _candidates;
+
+ private long _creationMillis;
+
+ public SegmentState(long creationMillis) {
+ _creationMillis = creationMillis;
+ _candidates = new HashMap<>();
+ }
+
+ public boolean isNew(long nowMillis) {
+ return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis,
nowMillis);
+ }
+
+ public void resetCandidates() {
+ _candidates.clear();
+ }
+
+ public void addCandidate(String candidate, boolean online) {
+ _candidates.put(candidate, online);
+ }
+
+ public HashMap<String, Boolean> getCandidates() {
+ return _candidates;
+ }
+ }
// To prevent int overflow, reset the request id once it reaches this value
private static final long MAX_REQUEST_ID = 1_000_000_000;
- private final String _tableNameWithType;
private final BrokerMetrics _brokerMetrics;
+ private final String _tableNameWithType;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected final AdaptiveServerSelector _adaptiveServerSelector;
// These 4 variables are the cached states to help accelerate the change
processing
private Set<String> _enabledInstances;
private Map<String, List<String>> _segmentToOnlineInstancesMap;
- private Map<String, List<String>> _segmentToOfflineInstancesMap;
private Map<String, List<String>> _instanceToSegmentsMap;
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]