snleee commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1143464290
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Base implementation of instance selector which maintains a map from segment
to enabled ONLINE/CONSUMING server
* instances that serves the segment and a set of unavailable segments (no
enabled instance or all enabled instances are
* in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first
update of ideal state for that segment as
+ * approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available,
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the
new segments that expire with the clock
+ * are still considered as old.
*/
abstract class BaseInstanceSelector implements InstanceSelector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(BaseInstanceSelector.class);
+ public static class SegmentState {
+ // List of instance for this segment in ideal state.
+ // Mapping from candidate to index in _candidateInstance.
+ private HashMap<String, Boolean> _candidates;
+
+ private long _creationMillis;
+
+ public SegmentState(long creationMillis) {
+ _creationMillis = creationMillis;
+ _candidates = new HashMap<>();
+ }
+
+ public boolean isNew(long nowMillis) {
+ return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis,
nowMillis);
+ }
+
+ public void resetCandidates() {
+ _candidates.clear();
+ }
+
+ public void addCandidate(String candidate, boolean online) {
+ _candidates.put(candidate, online);
+ }
+
+ public HashMap<String, Boolean> getCandidates() {
+ return _candidates;
+ }
+ }
// To prevent int overflow, reset the request id once it reaches this value
private static final long MAX_REQUEST_ID = 1_000_000_000;
- private final String _tableNameWithType;
private final BrokerMetrics _brokerMetrics;
+ private final String _tableNameWithType;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected final AdaptiveServerSelector _adaptiveServerSelector;
// These 4 variables are the cached states to help accelerate the change
processing
private Set<String> _enabledInstances;
private Map<String, List<String>> _segmentToOnlineInstancesMap;
- private Map<String, List<String>> _segmentToOfflineInstancesMap;
private Map<String, List<String>> _instanceToSegmentsMap;
+ private Map<String, SegmentState> _newSegmentStates;
- // These 2 variables are needed for instance selection (multi-threaded), so
make them volatile
- private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
- private volatile Set<String> _unavailableSegments;
+ // _segmentStateSnapshot is needed for instance selection (multi-threaded),
so make them volatile
+ private volatile SegmentStateSnapshot _segmentStateSnapshot;
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]