xiangfu0 commented on code in PR #18853:
URL: https://github.com/apache/pinot/pull/18853#discussion_r3485976503


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -428,6 +449,131 @@ public static RoaringBitmap 
getValidDocIdFromServerMatchingCrc(String tableNameW
     return maxCardinalityMap;
   }
 
+  /**
+   * Counts how many servers host each segment (its online replica count) from 
a server-to-segments map, so callers
+   * can tell whether every assigned replica responded.
+   */
+  public static Map<String, Integer> getSegmentToReplicaCount(Map<String, 
List<String>> serverToSegments) {
+    Map<String, Integer> segmentToReplicaCount = new HashMap<>();
+    for (List<String> segments : serverToSegments.values()) {
+      for (String segment : segments) {
+        segmentToReplicaCount.merge(segment, 1, Integer::sum);
+      }
+    }
+    return segmentToReplicaCount;
+  }
+
+  /**
+   * Picks the replica whose validDocIds the generator should use for a 
segment, or returns {@code null} to skip the
+   * segment. Applies the same checks the executor would (CRC match, healthy 
server, replica consensus) so bad
+   * segments are dropped before a task is scheduled.
+   *
+   * How the replica is chosen depends on {@code consensusMode}:
+   *   - UNSAFE: the first replica with a matching CRC and a healthy server.
+   *   - EQUAL: all replicas must match CRC, be healthy, and have an identical 
bitmap.
+   *   - MOST_VALID_DOCS: all replicas must match CRC and be healthy; the one 
with the most valid docs wins.
+   */
+  @Nullable
+  public static ValidDocIdsMetadataInfo 
selectValidDocIdsMetadataForConsensus(String taskType, String segmentName,
+      long expectedCrc, @Nullable List<ValidDocIdsMetadataInfo> replicas, int 
expectedReplicaCount,
+      MinionConstants.ValidDocIdsConsensusMode consensusMode) {
+    if (replicas == null || replicas.isEmpty()) {
+      return null;
+    }
+    boolean unsafe = consensusMode == 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE;
+    List<ValidDocIdsMetadataInfo> usableReplicas = new ArrayList<>();
+    for (ValidDocIdsMetadataInfo replica : replicas) {
+      // A CRC mismatch usually means the server is still reloading the 
segment, so its valid doc set can't be
+      // trusted. UNSAFE skips just this replica; stricter modes skip the 
whole segment.
+      long replicaCrc;
+      try {
+        replicaCrc = Long.parseLong(replica.getSegmentCrc());
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {}, 
skipping {} (mode={}) for {}",
+            replica.getSegmentCrc(), segmentName, replica.getInstanceId(), 
unsafe ? "replica" : "segment",
+            consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      if (expectedCrc != replicaCrc) {
+        LOGGER.warn("CRC mismatch for segment: {} (expected={}, server={} 
reported={}), skipping {} (mode={}) for {}",
+            segmentName, expectedCrc, replica.getInstanceId(), replicaCrc, 
unsafe ? "replica" : "segment",
+            consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      // A non-GOOD server may still be mutating the segment, so its valid doc 
set is unreliable.
+      if (replica.getServerStatus() != null && replica.getServerStatus() != 
ServiceStatus.Status.GOOD) {
+        LOGGER.warn("Server {} is in {} state for segment: {}, skipping {} 
(mode={}) for {}", replica.getInstanceId(),
+            replica.getServerStatus(), segmentName, unsafe ? "replica" : 
"segment", consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      if (unsafe) {
+        return replica;
+      }
+      usableReplicas.add(replica);
+    }
+
+    if (usableReplicas.isEmpty()) {
+      return null;
+    }
+
+    // Strict modes need every assigned replica to have responded; a short 
responder list means a server dropped out
+    // (network error, parse failure, or it no longer has the segment), so we 
can't confirm consensus across the full
+    // replica set and skip the segment.
+    if (usableReplicas.size() < expectedReplicaCount) {
+      LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot 
confirm consensus, skipping for {}",
+          usableReplicas.size(), expectedReplicaCount, segmentName, taskType);
+      return null;
+    }
+
+    if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
+      ValidDocIdsMetadataInfo first = usableReplicas.get(0);
+      RoaringBitmap consensusBitmap = deserializeBitmapOrNull(first);
+      if (consensusBitmap == null) {

Review Comment:
   This makes controller-first rolling upgrades incompatible under the new 
default STRICT/EQUAL path. Older servers still answer this endpoint but omit 
`bitmap`, and `ValidDocIdsMetadataInfo` explicitly treats that as expected for 
old servers; here we convert that mixed-version response into a hard skip, so 
upsert compaction/compact-merge task generation stops until every server is 
upgraded. Please keep the generator default executor-only, or add an old-server 
fallback before making bitmap-based prescheduling the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to