xiangfu0 commented on code in PR #18853:
URL: https://github.com/apache/pinot/pull/18853#discussion_r3491640383


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -428,6 +464,131 @@ public static RoaringBitmap 
getValidDocIdFromServerMatchingCrc(String tableNameW
     return maxCardinalityMap;
   }
 
+  /**
+   * Counts how many servers host each segment (its online replica count) from 
a server-to-segments map, so callers
+   * can tell whether every assigned replica responded.
+   */
+  public static Map<String, Integer> getSegmentToReplicaCount(Map<String, 
List<String>> serverToSegments) {
+    Map<String, Integer> segmentToReplicaCount = new HashMap<>();
+    for (List<String> segments : serverToSegments.values()) {
+      for (String segment : segments) {
+        segmentToReplicaCount.merge(segment, 1, Integer::sum);
+      }
+    }
+    return segmentToReplicaCount;
+  }
+
+  /**
+   * Picks the replica whose validDocIds the generator should use for a 
segment, or returns {@code null} to skip the
+   * segment. Applies the same checks the executor would (CRC match, healthy 
server, replica consensus) so bad
+   * segments are dropped before a task is scheduled.
+   *
+   * How the replica is chosen depends on {@code consensusMode}:
+   *   - UNSAFE: the first replica with a matching CRC and a healthy server.
+   *   - EQUAL: all replicas must match CRC, be healthy, and have an identical 
bitmap.
+   *   - MOST_VALID_DOCS: all replicas must match CRC and be healthy; the one 
with the most valid docs wins.
+   */
+  @Nullable
+  public static ValidDocIdsMetadataInfo 
selectValidDocIdsMetadataForConsensus(String taskType, String segmentName,
+      long expectedCrc, @Nullable List<ValidDocIdsMetadataInfo> replicas, int 
expectedReplicaCount,
+      MinionConstants.ValidDocIdsConsensusMode consensusMode) {
+    if (replicas == null || replicas.isEmpty()) {
+      return null;
+    }
+    boolean unsafe = consensusMode == 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE;
+    List<ValidDocIdsMetadataInfo> usableReplicas = new ArrayList<>();
+    for (ValidDocIdsMetadataInfo replica : replicas) {
+      // A CRC mismatch usually means the server is still reloading the 
segment, so its valid doc set can't be
+      // trusted. UNSAFE skips just this replica; stricter modes skip the 
whole segment.
+      long replicaCrc;
+      try {
+        replicaCrc = Long.parseLong(replica.getSegmentCrc());
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {}, 
skipping {} (mode={}) for {}",
+            replica.getSegmentCrc(), segmentName, replica.getInstanceId(), 
unsafe ? "replica" : "segment",
+            consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      if (expectedCrc != replicaCrc) {
+        LOGGER.warn("CRC mismatch for segment: {} (expected={}, server={} 
reported={}), skipping {} (mode={}) for {}",
+            segmentName, expectedCrc, replica.getInstanceId(), replicaCrc, 
unsafe ? "replica" : "segment",
+            consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      // A non-GOOD server may still be mutating the segment, so its valid doc 
set is unreliable.
+      if (replica.getServerStatus() != null && replica.getServerStatus() != 
ServiceStatus.Status.GOOD) {
+        LOGGER.warn("Server {} is in {} state for segment: {}, skipping {} 
(mode={}) for {}", replica.getInstanceId(),
+            replica.getServerStatus(), segmentName, unsafe ? "replica" : 
"segment", consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      if (unsafe) {
+        return replica;
+      }
+      usableReplicas.add(replica);
+    }
+
+    if (usableReplicas.isEmpty()) {
+      return null;
+    }
+
+    // Strict modes need every assigned replica to have responded; a short 
responder list means a server dropped out
+    // (network error, parse failure, or it no longer has the segment), so we 
can't confirm consensus across the full
+    // replica set and skip the segment.
+    if (usableReplicas.size() < expectedReplicaCount) {
+      LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot 
confirm consensus, skipping for {}",
+          usableReplicas.size(), expectedReplicaCount, segmentName, taskType);
+      return null;
+    }
+
+    if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
+      ValidDocIdsMetadataInfo first = usableReplicas.get(0);
+      RoaringBitmap consensusBitmap = deserializeBitmapOrNull(first);
+      if (consensusBitmap == null) {

Review Comment:
   This still breaks mixed-version rolling upgrades. With the new defaults 
(`EQUAL` + `STRICT`), the generator requests `includeBitmaps=true`. Older 
servers ignore that query param and return `ValidDocIdsMetadataInfo` without 
`bitmap`, and this branch turns that into a hard skip, so controller-first 
upgrades stop scheduling upsert compaction / compact-merge tasks until all 
servers are upgraded or operators manually flip `EXECUTOR_ONLY`. Pinot normally 
needs controller/server roll-forward compatibility without per-table config 
changes, so this needs a fallback for older servers or the generator-side 
default needs to stay `EXECUTOR_ONLY`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to