This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f0329f33762 Enforce validDocIds consensus in upsert task generators 
and add includeBitmaps to validDocIdsMetadata API (#18853)
f0329f33762 is described below

commit f0329f33762de8afbe1b2901952c7129d9ca7a85
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Tue Jun 30 17:10:06 2026 -0700

    Enforce validDocIds consensus in upsert task generators and add 
includeBitmaps to validDocIdsMetadata API (#18853)
---
 .../resources/ValidDocIdsBitmapResponse.java       |  22 ++-
 .../restlet/resources/ValidDocIdsMetadataInfo.java |  19 ++-
 .../util/ServerSegmentMetadataReader.java          |  37 ++++-
 .../apache/pinot/core/common/MinionConstants.java  |  24 ++--
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 155 ++++++++++++++++++++-
 .../UpsertCompactionTaskExecutor.java              |  10 +-
 .../UpsertCompactionTaskGenerator.java             |  81 +++++------
 .../UpsertCompactMergeTaskExecutor.java            |   2 +-
 .../UpsertCompactMergeTaskGenerator.java           |  83 ++++++-----
 .../plugin/minion/tasks/MinionTaskUtilsTest.java   |  83 ++++++++++-
 .../UpsertCompactionTaskGeneratorTest.java         | 109 ++++++++++++++-
 .../UpsertCompactMergeTaskGeneratorTest.java       |  82 ++++++++++-
 .../pinot/server/api/resources/TablesResource.java |  14 +-
 13 files changed, 600 insertions(+), 121 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
index 2cb8d661318..d925c69d072 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
@@ -18,24 +18,35 @@
  */
 package org.apache.pinot.common.restlet.resources;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.ServiceStatus;
 
 
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ValidDocIdsBitmapResponse {
   private final String _segmentName;
   private final String _segmentCrc;
+  // Server's data CRC (forward index + dictionary checksum); null when the 
server doesn't report it.
+  @Nullable
+  private final String _segmentDataCrc;
   private final ValidDocIdsType _validDocIdsType;
   private final byte[] _bitmap;
   private final String _instanceId;
   private final ServiceStatus.Status _serverStatus;
 
+  @JsonCreator
   public ValidDocIdsBitmapResponse(@JsonProperty("segmentName") String 
segmentName,
-      @JsonProperty("segmentCrc") String crc, @JsonProperty("validDocIdsType") 
ValidDocIdsType validDocIdsType,
-      @JsonProperty("bitmap") byte[] bitmap, @JsonProperty("instanceId") 
String instanceId,
+      @JsonProperty("segmentCrc") String crc, @JsonProperty("segmentDataCrc") 
@Nullable String segmentDataCrc,
+      @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType, 
@JsonProperty("bitmap") byte[] bitmap,
+      @JsonProperty("instanceId") String instanceId,
       @JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
     _segmentName = segmentName;
     _segmentCrc = crc;
+    _segmentDataCrc = segmentDataCrc;
     _validDocIdsType = validDocIdsType;
     _bitmap = bitmap;
     _instanceId = instanceId;
@@ -50,6 +61,13 @@ public class ValidDocIdsBitmapResponse {
     return _segmentCrc;
   }
 
+  /// Server's data CRC, or null if not reported. Omitted from the payload 
when null.
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public String getSegmentDataCrc() {
+    return _segmentDataCrc;
+  }
+
   public ValidDocIdsType getValidDocIdsType() {
     return _validDocIdsType;
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
index 6fd42c86f65..185d8b053bc 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.common.restlet.resources;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.ServiceStatus;
 
 
@@ -30,24 +33,31 @@ public class ValidDocIdsMetadataInfo {
   private final long _totalInvalidDocs;
   private final long _totalDocs;
   private final String _segmentCrc;
+  // Server's data CRC (forward index + dictionary checksum); null when the 
server doesn't report it.
+  @Nullable
+  private final String _segmentDataCrc;
   private final ValidDocIdsType _validDocIdsType;
   private final long _segmentSizeInBytes;
   private final long _segmentCreationTimeMillis;
   private final String _instanceId;
   private final ServiceStatus.Status _serverStatus;
 
+  @JsonCreator
   public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String 
segmentName,
       @JsonProperty("totalValidDocs") long totalValidDocs, 
@JsonProperty("totalInvalidDocs") long totalInvalidDocs,
       @JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") 
String segmentCrc,
+      @JsonProperty("segmentDataCrc") @Nullable String segmentDataCrc,
       @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
       @JsonProperty("segmentSizeInBytes") long segmentSizeInBytes,
       @JsonProperty("segmentCreationTimeMillis") long 
segmentCreationTimeMillis,
-      @JsonProperty("instanceId") String instanceId, 
@JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
+      @JsonProperty("instanceId") String instanceId,
+      @JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
     _segmentName = segmentName;
     _totalValidDocs = totalValidDocs;
     _totalInvalidDocs = totalInvalidDocs;
     _totalDocs = totalDocs;
     _segmentCrc = segmentCrc;
+    _segmentDataCrc = segmentDataCrc;
     _validDocIdsType = validDocIdsType;
     _segmentSizeInBytes = segmentSizeInBytes;
     _segmentCreationTimeMillis = segmentCreationTimeMillis;
@@ -75,6 +85,13 @@ public class ValidDocIdsMetadataInfo {
     return _segmentCrc;
   }
 
+  /// Server's data CRC, or null if not reported. Omitted from the payload 
when null.
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public String getSegmentDataCrc() {
+    return _segmentDataCrc;
+  }
+
   public ValidDocIdsType getValidDocIdsType() {
     return _validDocIdsType;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index a1669a2882b..a079f6af06e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -267,8 +267,9 @@ public class ServerSegmentMetadataReader {
       @Nullable List<String> segmentNames, int timeoutMs, String 
validDocIdsType,
       int numSegmentsBatchPerServerRequest) {
     return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, 
serverToSegmentsMap, serverToEndpoints,
-        segmentNames, timeoutMs, validDocIdsType, 
numSegmentsBatchPerServerRequest).values().stream()
-        .filter(list -> list != null && !list.isEmpty()).map(list -> 
list.get(0)).collect(Collectors.toList());
+        segmentNames, timeoutMs, validDocIdsType, 
numSegmentsBatchPerServerRequest).getSegmentToMetadata().values()
+        .stream().filter(list -> list != null && !list.isEmpty()).map(list -> 
list.get(0))
+        .collect(Collectors.toList());
   }
 
   /**
@@ -276,13 +277,14 @@ public class ServerSegmentMetadataReader {
    * This method will pick all servers that hosts the target segment and fetch 
the segment metadata result and
    * return as a list.
    *
-   * @return map of segment name to list of valid doc id metadata where each 
element is every server's metadata.
+   * @return the per-segment metadata from every responding server, plus the 
expected replica count per segment.
    */
-  public Map<String, List<ValidDocIdsMetadataInfo>> 
getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
+  public ValidDocIdsMetadataResult 
getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
       Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
       @Nullable List<String> segmentNames, int timeoutMs, String 
validDocIdsType,
       int numSegmentsBatchPerServerRequest) {
     List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+    Map<String, Integer> segmentToExpectedReplicaCount = new HashMap<>();
     for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
       List<String> segmentsForServer = serverToSegments.getValue();
       List<String> segmentsToQuery = new ArrayList<>();
@@ -296,6 +298,9 @@ public class ServerSegmentMetadataReader {
           }
         }
       }
+      for (String segment : segmentsToQuery) {
+        segmentToExpectedReplicaCount.merge(segment, 1, Integer::sum);
+      }
 
       // Number of segments to query per server request. If a table has a lot 
of segments, then we might send a
       // huge payload to pinot-server in request. Batching the requests will 
help in reducing the payload size.
@@ -352,7 +357,29 @@ public class ServerSegmentMetadataReader {
 
     LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server 
requests.",
         validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
-    return validDocIdsMetadataInfos;
+    return new ValidDocIdsMetadataResult(validDocIdsMetadataInfos, 
segmentToExpectedReplicaCount);
+  }
+
+  /// Result of [#getSegmentToValidDocIdsMetadataFromServer]: the per-segment 
metadata from every responding server,
+  /// plus the expected replica count per segment (how many servers host it). 
The count lets callers detect a replica
+  /// that did not respond, since a missing replica is simply absent from the 
metadata.
+  public static class ValidDocIdsMetadataResult {
+    private final Map<String, List<ValidDocIdsMetadataInfo>> 
_segmentToMetadata;
+    private final Map<String, Integer> _segmentToExpectedReplicaCount;
+
+    public ValidDocIdsMetadataResult(Map<String, 
List<ValidDocIdsMetadataInfo>> segmentToMetadata,
+        Map<String, Integer> segmentToExpectedReplicaCount) {
+      _segmentToMetadata = segmentToMetadata;
+      _segmentToExpectedReplicaCount = segmentToExpectedReplicaCount;
+    }
+
+    public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToMetadata() {
+      return _segmentToMetadata;
+    }
+
+    public Map<String, Integer> getSegmentToExpectedReplicaCount() {
+      return _segmentToExpectedReplicaCount;
+    }
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 976216d06a9..007fe183738 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,11 +92,18 @@ public class MinionConstants {
    */
   public static final String SEGMENT_DOWNLOAD_PARALLELISM = 
"segmentDownloadParallelism";
 
-  /** Valid doc ids consensus mode (executor-only). Kept internal; executors 
pass config string. */
+  /// Valid doc ids consensus mode enforced by both the task generators 
(pre-scheduling) and the executors.
   public enum ValidDocIdsConsensusMode {
     UNSAFE, EQUAL, MOST_VALID_DOCS
   }
 
+  /// Where validDocIds consensus is enforced. STRICT (default) runs the 
checks in both the task generator
+  /// (pre-scheduling) and the executor; EXECUTOR_ONLY skips the 
generator-side checks and leaves the executor as the
+  /// sole gate.
+  public enum ValidDocIdsValidationMode {
+    STRICT, EXECUTOR_ONLY
+  }
+
   // Purges rows inside segment that match chosen criteria
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
@@ -268,15 +275,16 @@ public class MinionConstants {
      */
     public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 
"numSegmentsBatchPerServerRequest";
 
-    /**
-     * Valid doc ids consensus mode used by the executor only (generator 
unchanged). Values: UNSAFE, EQUAL,
-     * MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY; 
EQUAL = require all replicas
-     * to have the same valid doc set (default); MOST_VALID_DOCS = use replica 
with most valid docs.
-     */
+    /// Valid doc ids consensus mode used by both the task generators 
(pre-scheduling) and the executors. UNSAFE =
+    /// first server with matching CRC and GOOD status; EQUAL (default) = all 
replicas must agree; MOST_VALID_DOCS =
+    /// the replica with the most valid docs. Shared by UpsertCompactionTask 
and UpsertCompactMergeTask.
     public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = 
"validDocIdsConsensusMode";
-
-    /** Default: equal valid doc set consensus across replicas. */
     public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";
+
+    /// Whether the consensus checks also run in the generator. STRICT 
(default) = generator + executor;
+    /// EXECUTOR_ONLY = executor only. Shared by UpsertCompactionTask and 
UpsertCompactMergeTask.
+    public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY = 
"validDocIdsValidationMode";
+    public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = 
"STRICT";
   }
 
   public static class UpsertCompactMergeTask {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index ea2150644c3..3667d731570 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.minion.tasks;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.net.URI;
 import java.text.SimpleDateFormat;
@@ -30,6 +31,7 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
@@ -38,6 +40,7 @@ import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.common.auth.NullAuthProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.RetentionUtils;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
@@ -72,14 +75,32 @@ import org.slf4j.LoggerFactory;
 public class MinionTaskUtils {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionTaskUtils.class);
 
-  /** Package-private for testing: parses validDocIdsComparisonMode config 
string. */
-  static MinionConstants.ValidDocIdsConsensusMode 
parseValidDocIdsConsensusMode(String value) {
-    if (value == null || value.isBlank()) {
+  /// Parses the validDocIdsConsensusMode config string. Blank/null defaults 
to `EQUAL`.
+  public static MinionConstants.ValidDocIdsConsensusMode 
parseValidDocIdsConsensusMode(String value) {
+    if (StringUtils.isBlank(value)) {
       return MinionConstants.ValidDocIdsConsensusMode.EQUAL;
     }
     return 
MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim());
   }
 
+  /// Parses the validDocIdsValidationMode config string. Blank/null defaults 
to `STRICT`.
+  public static MinionConstants.ValidDocIdsValidationMode 
parseValidDocIdsValidationMode(String value) {
+    if (StringUtils.isBlank(value)) {
+      return MinionConstants.ValidDocIdsValidationMode.STRICT;
+    }
+    return 
MinionConstants.ValidDocIdsValidationMode.valueOf(value.toUpperCase().trim());
+  }
+
+  /// Resolves the consensus mode the generator should apply, given the 
configured consensus mode and validation
+  /// mode. EXECUTOR_ONLY downgrades the generator to UNSAFE (lenient pick, no 
bitmaps, no cross-replica enforcement)
+  /// so the executor stays the sole gate; STRICT keeps the configured mode.
+  public static MinionConstants.ValidDocIdsConsensusMode 
resolveGeneratorConsensusMode(
+      MinionConstants.ValidDocIdsConsensusMode consensusMode,
+      MinionConstants.ValidDocIdsValidationMode validationMode) {
+    return validationMode == 
MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY
+        ? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode;
+  }
+
   private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";
 
   public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
@@ -324,6 +345,14 @@ public class MinionTaskUtils {
   @Nullable
   public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
       String validDocIdsType, MinionContext minionContext, String expectedCrc, 
String comparisonModeStr) {
+    return getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, 
validDocIdsType, minionContext,
+        expectedCrc, null, comparisonModeStr);
+  }
+
+  /// Variant that also matches on the expected data CRC; see [#crcMatches].
+  public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
+      String validDocIdsType, MinionContext minionContext, String expectedCrc, 
@Nullable String expectedDataCrc,
+      String comparisonModeStr) {
     MinionConstants.ValidDocIdsConsensusMode consensusMode = 
parseValidDocIdsConsensusMode(comparisonModeStr);
     String clusterName = minionContext.getHelixManager().getClusterName();
     HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
@@ -352,13 +381,15 @@ public class MinionTaskUtils {
       }
 
       String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
+      long serverDataCrc = 
parseCrc(validDocIdsBitmapResponse.getSegmentDataCrc());
       // Check crc from the downloaded segment against the crc returned from 
the server along with the valid doc id
       // bitmap. If this doesn't match, this means that we are hitting the 
race condition where the segment has been
       // uploaded successfully while the server is still reloading the 
segment. Reloading can take a while when the
       // offheap upsert is used because we will need to delete & add all 
primary keys.
       // `BaseSingleSegmentConversionExecutor.executeTask()` already checks 
for the crc from the task generator
       // against the crc from the current segment zk metadata, so we don't 
need to check that here.
-      if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
+      if (!crcMatches(parseCrc(expectedCrc), parseCrc(expectedDataCrc), 
parseCrc(crcFromValidDocIdsBitmap),
+          serverDataCrc)) {
         if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
           LOGGER.warn("CRC mismatch for segment: {} from endpoint {}, 
skipping", segmentName, endpoint);
           continue;
@@ -428,6 +459,122 @@ public class MinionTaskUtils {
     return maxCardinalityMap;
   }
 
+  /// Picks the replica whose validDocIds the generator should use, or null to 
skip the segment, applying the same
+  /// checks the executor would so bad segments aren't scheduled: the replica 
must match CRC (via [#crcMatches]) and
+  /// be on a healthy server. UNSAFE uses the first such replica, EQUAL 
requires all to report the same valid doc
+  /// count, and MOST_VALID_DOCS picks the highest.
+  @Nullable
+  public static ValidDocIdsMetadataInfo 
selectValidDocIdsMetadataForConsensus(String taskType,
+      SegmentZKMetadata segmentZKMetadata, @Nullable 
List<ValidDocIdsMetadataInfo> replicas, int expectedReplicaCount,
+      MinionConstants.ValidDocIdsConsensusMode consensusMode) {
+    String segmentName = segmentZKMetadata.getSegmentName();
+    if (CollectionUtils.isEmpty(replicas)) {
+      return null;
+    }
+    boolean unsafe = consensusMode == 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE;
+    List<ValidDocIdsMetadataInfo> usableReplicas = new ArrayList<>();
+    for (ValidDocIdsMetadataInfo replica : replicas) {
+      // A CRC mismatch usually means the server is still reloading the 
segment, so its valid doc set can't be
+      // trusted. UNSAFE skips just this replica; stricter modes skip the 
whole segment.
+      long replicaCrc = parseCrc(replica.getSegmentCrc());
+      if (replicaCrc < 0) {
+        LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {}, 
skipping {} (mode={}) for {}",
+            replica.getSegmentCrc(), segmentName, replica.getInstanceId(), 
unsafe ? "replica" : "segment",
+            consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      // -1 when this table doesn't use data CRC, so crcMatches falls back to 
the segment CRC.
+      long zkDataCrc = segmentZKMetadata.isUseDataCrc() ? 
segmentZKMetadata.getDataCrc() : -1;
+      if (!crcMatches(segmentZKMetadata.getCrc(), zkDataCrc, replicaCrc, 
parseCrc(replica.getSegmentDataCrc()))) {
+        LOGGER.warn("CRC mismatch for segment: {} (zkCrc={}, zkDataCrc={}, 
server={} reported crc={} dataCrc={}), "
+                + "skipping {} (mode={}) for {}", segmentName, 
segmentZKMetadata.getCrc(),
+            segmentZKMetadata.getDataCrc(), replica.getInstanceId(), 
replicaCrc, replica.getSegmentDataCrc(),
+            unsafe ? "replica" : "segment", consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      // A non-GOOD server may still be mutating the segment, so its valid doc 
set is unreliable.
+      if (replica.getServerStatus() != null && replica.getServerStatus() != 
ServiceStatus.Status.GOOD) {
+        LOGGER.warn("Server {} is in {} state for segment: {}, skipping {} 
(mode={}) for {}", replica.getInstanceId(),
+            replica.getServerStatus(), segmentName, unsafe ? "replica" : 
"segment", consensusMode, taskType);
+        if (unsafe) {
+          continue;
+        }
+        return null;
+      }
+      if (unsafe) {
+        return replica;
+      }
+      usableReplicas.add(replica);
+    }
+
+    if (usableReplicas.isEmpty()) {
+      return null;
+    }
+
+    // Strict modes need every assigned replica to have responded; a short 
responder list means a server dropped out
+    // (network error, parse failure, or it no longer has the segment), so we 
can't confirm consensus across the full
+    // replica set and skip the segment.
+    if (usableReplicas.size() < expectedReplicaCount) {
+      LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot 
confirm consensus, skipping for {}",
+          usableReplicas.size(), expectedReplicaCount, segmentName, taskType);
+      return null;
+    }
+
+    if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
+      // Require every replica to report the same valid doc count. Comparing 
counts (rather than the full bitmaps)
+      // keeps the generator cheap - it avoids serializing a bitmap per 
replica back to the controller.
+      ValidDocIdsMetadataInfo first = usableReplicas.get(0);
+      for (int i = 1; i < usableReplicas.size(); i++) {
+        if (usableReplicas.get(i).getTotalValidDocs() != 
first.getTotalValidDocs()) {
+          LOGGER.warn("Replicas disagree on valid doc count for segment: {}, 
skipping segment for {}", segmentName,
+              taskType);
+          return null;
+        }
+      }
+      return first;
+    }
+
+    // MOST_VALID_DOCS: pick the replica reporting the most valid docs.
+    ValidDocIdsMetadataInfo chosen = usableReplicas.get(0);
+    for (ValidDocIdsMetadataInfo replica : usableReplicas) {
+      if (replica.getTotalValidDocs() > chosen.getTotalValidDocs()) {
+        chosen = replica;
+      }
+    }
+    return chosen;
+  }
+
+  /// Whether two segment copies hold the same data. Matches on the full 
segment CRC, or - when those differ - on the
+  /// data CRC (a checksum over only the forward index and dictionary, so 
index/metadata-only changes don't affect
+  /// it) when both copies report one (`>= 0`). A negative data CRC means "not 
reported". Mirrors the logic of
+  /// `BaseTableDataManager.hasSameCRC` and is used by both the generator's 
pre-scheduling check and the executor's
+  /// per-server check.
+  @VisibleForTesting
+  static boolean crcMatches(long segmentCrc, long dataCrc, long 
otherSegmentCrc, long otherDataCrc) {
+    if (segmentCrc == otherSegmentCrc) {
+      return true;
+    }
+    return dataCrc >= 0 && otherDataCrc >= 0 && dataCrc == otherDataCrc;
+  }
+
+  /// Parses a CRC string, returning `-1` ("unavailable") when it is null or 
unparseable.
+  private static long parseCrc(@Nullable String crc) {
+    if (crc == null) {
+      return -1;
+    }
+    try {
+      return Long.parseLong(crc);
+    } catch (NumberFormatException e) {
+      return -1;
+    }
+  }
+
   public static String toUTCString(long epochMillis) {
     Date date = new Date(epochMillis);
     SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index ce97909e58c..2d04d1edaec 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -78,13 +78,13 @@ public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExe
     // Executor-only: read comparison mode string from task config (no auth 
resolution or URL hits).
     Map<String, String> taskConfigs =
         tableConfig.getTaskConfig() != null ? 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
-    String consensusMode =
-        taskConfigs != null ? 
taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
-            UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
-            : UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
+    String consensusMode = taskConfigs != null
+        ? 
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+            
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
+        : 
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
     RoaringBitmap validDocIds =
         MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, 
segmentName, validDocIdsTypeStr,
-            MINION_CONTEXT, originalSegmentCrcFromTaskGenerator, 
consensusMode);
+            MINION_CONTEXT, originalSegmentCrcFromTaskGenerator, 
segmentMetadata.getDataCrc(), consensusMode);
     if (validDocIds == null) {
       // no valid crc match found or no validDocIds obtained from all servers
       // error out the task instead of silently failing so that we can track 
it via task-error metrics
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 02e15b9d328..b9c077a1c7e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -34,7 +34,6 @@ import 
org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
-import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
@@ -146,21 +145,34 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
       ValidDocIdsType validDocIdsType = 
MinionTaskUtils.getValidDocIdsType(tableConfig.getUpsertConfig(), taskConfigs,
           UpsertCompactionTask.VALID_DOC_IDS_TYPE);
 
+      // Validate replicas before scheduling, matching the executor's checks, 
so inconsistent segments are never
+      // scheduled. With EXECUTOR_ONLY the generator skips these checks (the 
executor stays the gate).
+      MinionConstants.ValidDocIdsConsensusMode consensusMode = 
MinionTaskUtils.resolveGeneratorConsensusMode(
+          MinionTaskUtils.parseValidDocIdsConsensusMode(
+              
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+                  
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)),
+          MinionTaskUtils.parseValidDocIdsValidationMode(
+              
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY,
+                  
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE)));
+
       // Number of segments to query per server request. If a table has a lot 
of segments, then we might send a
       // huge payload to pinot-server in request. Batching the requests will 
help in reducing the payload size.
       int numSegmentsBatchPerServerRequest = Integer.parseInt(
           
taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
               String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
 
-      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+      ServerSegmentMetadataReader.ValidDocIdsMetadataResult 
validDocIdsMetadataResult =
           
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
 serverToSegments,
               serverToEndpoints, null, 60_000, validDocIdsType.toString(), 
numSegmentsBatchPerServerRequest);
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+          validDocIdsMetadataResult.getSegmentToMetadata();
 
       Map<String, SegmentZKMetadata> completedSegmentsMap =
           
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
 
       SegmentSelectionResult segmentSelectionResult =
-          processValidDocIdsMetadata(taskConfigs, completedSegmentsMap, 
validDocIdsMetadataList);
+          processValidDocIdsMetadata(taskConfigs, completedSegmentsMap, 
validDocIdsMetadataList,
+              validDocIdsMetadataResult.getSegmentToExpectedReplicaCount(), 
consensusMode);
       int skippedSegmentsCount = validDocIdsMetadataList.size()
               - segmentSelectionResult.getSegmentsForCompaction().size()
               - segmentSelectionResult.getSegmentsForDeletion().size();
@@ -207,7 +219,8 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
   @VisibleForTesting
   public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, 
String> taskConfigs,
       Map<String, SegmentZKMetadata> completedSegmentsMap,
-      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
+      Map<String, Integer> segmentToReplicaCount, 
MinionConstants.ValidDocIdsConsensusMode consensusMode) {
     double invalidRecordsThresholdPercent = Double.parseDouble(
         
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
             String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
@@ -223,43 +236,33 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
         continue;
       }
       SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
-      for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoMap.get(segmentName)) {
-        long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
-
-        // Skip segments if the crc from zk metadata and server does not 
match. They may be being reloaded.
-        if (segment.getCrc() != 
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
-          LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, 
validDocIdsMetadata={})", segmentName,
-              segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
-          continue;
-        }
 
-        // skipping segments for which their servers are not in READY state. 
The bitmaps would be inconsistent when
-        // server is NOT READY as UPDATING segments might be updating the 
ONLINE segments
-        if (validDocIdsMetadata.getServerStatus() != null && 
!validDocIdsMetadata.getServerStatus()
-            .equals(ServiceStatus.Status.GOOD)) {
-          LOGGER.warn("Server {} is in {} state, skipping {} generation for 
segment: {}",
-              validDocIdsMetadata.getInstanceId(), 
validDocIdsMetadata.getServerStatus(),
-              MinionConstants.UpsertCompactionTask.TASK_TYPE, segmentName);
-          continue;
-        }
+      // Validate replicas (CRC match, server health, validDocIds consensus) 
before scheduling. Returns null when the
+      // segment should be skipped so we never schedule a task the executor 
would later reject.
+      List<ValidDocIdsMetadataInfo> replicas = 
validDocIdsMetadataInfoMap.get(segmentName);
+      ValidDocIdsMetadataInfo validDocIdsMetadata = 
MinionTaskUtils.selectValidDocIdsMetadataForConsensus(
+          MinionConstants.UpsertCompactionTask.TASK_TYPE, segment, replicas,
+          segmentToReplicaCount.getOrDefault(segmentName, replicas.size()), 
consensusMode);
+      if (validDocIdsMetadata == null) {
+        continue;
+      }
 
-        long totalDocs = validDocIdsMetadata.getTotalDocs();
-        double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) 
* 100;
-        if (totalInvalidDocs == totalDocs) {
-          LOGGER.debug("Segment {} contains only invalid records, adding it to 
the deletion list", segmentName);
-          segmentsForDeletion.add(segment.getSegmentName());
-        } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
-            && totalInvalidDocs >= invalidRecordsThresholdCount) {
-          LOGGER.debug("Segment {} contains {} invalid records out of {} total 
records "
-                  + "(count threshold: {}, percent threshold: {}), adding it 
to the compaction list", segmentName,
-              totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, 
invalidRecordsThresholdPercent);
-          segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
-        } else {
-          LOGGER.debug("Segment {} contains {} invalid records out of {} total 
records "
-                  + "(count threshold: {}, percent threshold: {}), skipping it 
for compaction", segmentName,
-              totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, 
invalidRecordsThresholdPercent);
-        }
-        break;
+      long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
+      long totalDocs = validDocIdsMetadata.getTotalDocs();
+      double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 
100;
+      if (totalInvalidDocs == totalDocs) {
+        LOGGER.debug("Segment {} contains only invalid records, adding it to 
the deletion list", segmentName);
+        segmentsForDeletion.add(segment.getSegmentName());
+      } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
+          && totalInvalidDocs >= invalidRecordsThresholdCount) {
+        LOGGER.debug("Segment {} contains {} invalid records out of {} total 
records "
+                + "(count threshold: {}, percent threshold: {}), adding it to 
the compaction list", segmentName,
+            totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, 
invalidRecordsThresholdPercent);
+        segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
+      } else {
+        LOGGER.debug("Segment {} contains {} invalid records out of {} total 
records "
+                + "(count threshold: {}, percent threshold: {}), skipping it 
for compaction", segmentName,
+            totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, 
invalidRecordsThresholdPercent);
       }
     }
     segmentsForCompaction.sort((o1, o2) -> {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
index c6847ce321a..bc435c2ec01 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
@@ -118,7 +118,7 @@ public class UpsertCompactMergeTaskExecutor extends 
BaseMultipleSegmentsConversi
 
     List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
       RoaringBitmap validDocIds = 
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, 
x.getName(),
-          ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc(), 
consensusMode);
+          ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc(), 
x.getDataCrc(), consensusMode);
       if (validDocIds == null) {
         // no valid crc match found or no validDocIds obtained from all servers
         // error out the task instead of silently failing so that we can track 
it via task-error metrics
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 1b9565aa24c..45e14405b62 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -178,15 +178,27 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
           new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
               _clusterInfoAccessor.getConnectionManager());
 
+      // Reuse the compaction task's consensus-mode and validation-mode config 
so both tasks behave the same. With
+      // EXECUTOR_ONLY the generator skips these checks (the executor stays 
the gate).
+      MinionConstants.ValidDocIdsConsensusMode consensusMode = 
MinionTaskUtils.resolveGeneratorConsensusMode(
+          MinionTaskUtils.parseValidDocIdsConsensusMode(
+              
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+                  
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)),
+          MinionTaskUtils.parseValidDocIdsValidationMode(
+              
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY,
+                  
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE)));
+
       // Number of segments to query per server request. If a table has a lot 
of segments, then we might send a
       // huge payload to pinot-server in request. Batching the requests will 
help in reducing the payload size.
       int numSegmentsBatchPerServerRequest = Integer.parseInt(
           
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
               String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
 
-      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+      ServerSegmentMetadataReader.ValidDocIdsMetadataResult 
validDocIdsMetadataResult =
           
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
 serverToSegments,
               serverToEndpoints, null, 60_000, 
ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest);
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+          validDocIdsMetadataResult.getSegmentToMetadata();
 
       Map<String, SegmentZKMetadata> candidateSegmentsMap =
           
candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
@@ -195,7 +207,8 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
 
       SegmentSelectionResult segmentSelectionResult =
           processValidDocIdsMetadata(tableNameWithType, taskConfigs, 
candidateSegmentsMap, validDocIdsMetadataList,
-              alreadyMergedSegments, 
_clusterInfoAccessor.getControllerMetrics());
+              alreadyMergedSegments, 
validDocIdsMetadataResult.getSegmentToExpectedReplicaCount(), consensusMode,
+              _clusterInfoAccessor.getControllerMetrics());
 
       if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
         pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion(),
@@ -249,15 +262,15 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
   }
 
   /**
-   * Processes validDocIds metadata to determine segments eligible for 
deletion or compaction.
-   * Evaluates segments based on valid/invalid document counts, server 
readiness, and CRC consistency.
-   * Requires consensus across all replicas on validDoc counts before 
proceeding with any operations.
-   * Marks segments with zero valid documents for deletion and groups others 
by partition for compaction.
+   * Determines which segments are eligible for deletion or compact-merge. For 
each segment, replicas are validated
+   * via {@code consensusMode} (CRC match, server health, validDocIds 
agreement); segments that fail are skipped.
+   * Segments with zero valid docs are marked for deletion, the rest are 
grouped by partition for compaction.
    */
   @VisibleForTesting
   public static SegmentSelectionResult processValidDocIdsMetadata(String 
tableNameWithType,
       Map<String, String> taskConfigs, Map<String, SegmentZKMetadata> 
candidateSegmentsMap,
       Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap, 
Set<String> alreadyMergedSegments,
+      Map<String, Integer> segmentToReplicaCount, 
MinionConstants.ValidDocIdsConsensusMode consensusMode,
       ControllerMetrics controllerMetrics) {
     Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge 
= new HashMap<>();
     Set<String> segmentsForDeletion = new HashSet<>();
@@ -302,40 +315,38 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
       }
       SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
 
-      // Process with existing logic using the first replica with matching CRC 
(since all have consensus)
-      for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoMap.get(segmentName)) {
-        long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
-        long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
-        long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
+      // Validate replicas (CRC match, server health, validDocIds consensus) 
before scheduling. Returns null when the
+      // segment should be skipped so we never schedule a task the executor 
would later reject.
+      List<ValidDocIdsMetadataInfo> replicas = 
validDocIdsMetadataInfoMap.get(segmentName);
+      ValidDocIdsMetadataInfo validDocIdsMetadata = 
MinionTaskUtils.selectValidDocIdsMetadataForConsensus(
+          MinionConstants.UpsertCompactMergeTask.TASK_TYPE, segment, replicas,
+          segmentToReplicaCount.getOrDefault(segmentName, replicas.size()), 
consensusMode);
+      if (validDocIdsMetadata == null) {
+        continue;
+      }
 
-        // Skip segments if the crc from zk metadata and server does not 
match. They may be getting reloaded.
-        if (segment.getCrc() != 
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
-          LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, 
validDocIdsMetadata={})", segmentName,
-              segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
-          continue;
-        }
+      long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
+      long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
+      long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
+      long totalDocs = validDocIdsMetadata.getTotalDocs();
 
-        // segments eligible for deletion with no valid records
-        long totalDocs = validDocIdsMetadata.getTotalDocs();
-        if (totalInvalidDocs == totalDocs) {
-          segmentsForDeletion.add(segmentName);
-        } else if (alreadyMergedSegments.contains(segmentName)) {
-          LOGGER.debug("Segment {} already merged. Skipping it for {}", 
segmentName,
+      // Segments with no valid records can be deleted outright.
+      if (totalInvalidDocs == totalDocs) {
+        segmentsForDeletion.add(segmentName);
+      } else if (alreadyMergedSegments.contains(segmentName)) {
+        LOGGER.debug("Segment {} already merged. Skipping it for {}", 
segmentName,
+            MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+      } else {
+        Integer partitionID = 
SegmentUtils.getPartitionIdFromSegmentName(segmentName);
+        if (partitionID == null) {
+          LOGGER.warn("Partition ID not found for segment: {}, skipping it for 
{}", segmentName,
               MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
-          break;
-        } else {
-          Integer partitionID = 
SegmentUtils.getPartitionIdFromSegmentName(segmentName);
-          if (partitionID == null) {
-            LOGGER.warn("Partition ID not found for segment: {}, skipping it 
for {}", segmentName,
-                MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
-            continue;
-          }
-          double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes * 
totalValidDocs * 1.0) / totalDocs;
-          segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> 
new ArrayList<>())
-              .add(new SegmentMergerMetadata(segment, totalValidDocs, 
totalInvalidDocs,
-                  expectedSegmentSizeAfterCompaction));
+          continue;
         }
-        break;
+        double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes * 
totalValidDocs * 1.0) / totalDocs;
+        segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> new 
ArrayList<>())
+            .add(new SegmentMergerMetadata(segment, totalValidDocs, 
totalInvalidDocs,
+                expectedSegmentSizeAfterCompaction));
       }
     }
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index 0b55986df46..b8518c0dd3d 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -65,6 +65,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 
@@ -333,9 +334,68 @@ public class MinionTaskUtilsTest {
         () -> MinionTaskUtils.parseValidDocIdsConsensusMode("INVALID_MODE"));
   }
 
-  /**
-   * Builds a RoaringBitmap with {@code numDocs} valid doc ids (0..numDocs-1).
-   */
+  @Test
+  public void testParseValidDocIdsValidationMode() {
+    // Blank/null defaults to STRICT
+    assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(null),
+        MinionConstants.ValidDocIdsValidationMode.STRICT);
+    assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(""),
+        MinionConstants.ValidDocIdsValidationMode.STRICT);
+    assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("  "),
+        MinionConstants.ValidDocIdsValidationMode.STRICT);
+
+    // Case-insensitive parsing
+    assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("STRICT"),
+        MinionConstants.ValidDocIdsValidationMode.STRICT);
+    
assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("executor_only"),
+        MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY);
+    assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("  
EXECUTOR_ONLY  "),
+        MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY);
+
+    expectThrows(IllegalArgumentException.class,
+        () -> MinionTaskUtils.parseValidDocIdsValidationMode("INVALID_MODE"));
+  }
+
+  @Test
+  public void testResolveGeneratorConsensusMode() {
+    // EXECUTOR_ONLY downgrades the generator to UNSAFE regardless of the 
configured consensus mode.
+    for (MinionConstants.ValidDocIdsConsensusMode mode : 
MinionConstants.ValidDocIdsConsensusMode.values()) {
+      assertEquals(
+          MinionTaskUtils.resolveGeneratorConsensusMode(mode, 
MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY),
+          MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+      // STRICT keeps the configured mode.
+      assertEquals(
+          MinionTaskUtils.resolveGeneratorConsensusMode(mode, 
MinionConstants.ValidDocIdsValidationMode.STRICT), mode);
+    }
+  }
+
+  @Test
+  public void testCrcMatches() {
+    assertTrue(MinionTaskUtils.crcMatches(1000, 5000, 1000, 9999));
+    assertTrue(MinionTaskUtils.crcMatches(1000, 5000, 2000, 5000));
+    assertFalse(MinionTaskUtils.crcMatches(1000, 5000, 2000, 9999));
+    assertFalse(MinionTaskUtils.crcMatches(1000, 5000, 2000, -1));
+    assertFalse(MinionTaskUtils.crcMatches(1000, -1, 2000, 5000));
+    assertFalse(MinionTaskUtils.crcMatches(1000, -1, 2000, -1));
+  }
+
+  @Test
+  public void testExecutorDataCrcFallbackMatch() {
+    List<Object> responses = List.of(makeResponse("seg1", "2000", "5000", 
"server1", makeBitmap(4)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader("myTable_REALTIME", "seg1", 
"1000",
+        "5000", "UNSAFE", responses, new String[]{"server1"}, this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 4);
+  }
+
+  @Test
+  public void testExecutorDataCrcMismatchSkips() {
+    List<Object> responses = List.of(makeResponse("seg1", "2000", "9999", 
"server1", makeBitmap(4)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader("myTable_REALTIME", "seg1", 
"1000",
+        "5000", "UNSAFE", responses, new String[]{"server1"}, this);
+    assertNull(result);
+  }
+
   private static RoaringBitmap makeBitmap(int numDocs) {
     RoaringBitmap b = new RoaringBitmap();
     for (int i = 0; i < numDocs; i++) {
@@ -349,7 +409,13 @@ public class MinionTaskUtilsTest {
    */
   private static ValidDocIdsBitmapResponse makeResponse(String segmentName, 
String crc, String instanceId,
       RoaringBitmap bitmap) {
-    return new ValidDocIdsBitmapResponse(segmentName, crc, 
ValidDocIdsType.SNAPSHOT,
+    return new ValidDocIdsBitmapResponse(segmentName, crc, null, 
ValidDocIdsType.SNAPSHOT,
+        RoaringBitmapUtils.serialize(bitmap), instanceId, 
ServiceStatus.Status.GOOD);
+  }
+
+  private static ValidDocIdsBitmapResponse makeResponse(String segmentName, 
String crc, String dataCrc,
+      String instanceId, RoaringBitmap bitmap) {
+    return new ValidDocIdsBitmapResponse(segmentName, crc, dataCrc, 
ValidDocIdsType.SNAPSHOT,
         RoaringBitmapUtils.serialize(bitmap), instanceId, 
ServiceStatus.Status.GOOD);
   }
 
@@ -395,6 +461,13 @@ public class MinionTaskUtilsTest {
   private static RoaringBitmap 
getValidDocIdFromServerMatchingCrcWithMockedReader(String tableName,
       String segmentName, String expectedCrc, String consensusMode, 
List<Object> responseOrThrowByCallOrder,
       String[] servers, MinionTaskUtilsTest testInstance) {
+    return getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, 
segmentName, expectedCrc, null, consensusMode,
+        responseOrThrowByCallOrder, servers, testInstance);
+  }
+
+  private static RoaringBitmap 
getValidDocIdFromServerMatchingCrcWithMockedReader(String tableName,
+      String segmentName, String expectedCrc, String expectedDataCrc, String 
consensusMode,
+      List<Object> responseOrThrowByCallOrder, String[] servers, 
MinionTaskUtilsTest testInstance) {
     testInstance.setupMinionContextWithServers(tableName, segmentName, 
servers);
     // Shared across all mock instances (production creates one reader per 
server).
     AtomicInteger callIndex = new AtomicInteger(0);
@@ -414,7 +487,7 @@ public class MinionTaskUtilsTest {
               });
         })) {
       return MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableName, 
segmentName,
-          ValidDocIdsType.SNAPSHOT.name(), MinionContext.getInstance(), 
expectedCrc, consensusMode);
+          ValidDocIdsType.SNAPSHOT.name(), MinionContext.getInstance(), 
expectedCrc, expectedDataCrc, consensusMode);
     }
   }
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 127f96a162f..33f8e4e3d7e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
@@ -232,13 +234,13 @@ public class UpsertCompactionTaskGeneratorTest {
     // no completed segments scenario, there shouldn't be any segment selected 
for compaction
     UpsertCompactionTaskGenerator.SegmentSelectionResult 
segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new 
HashMap<>(),
-            validDocIdsMetadataInfo);
+            validDocIdsMetadataInfo, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
 
     // test with valid crc and thresholds
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
-            validDocIdsMetadataInfo);
+            validDocIdsMetadataInfo, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -249,7 +251,7 @@ public class UpsertCompactionTaskGeneratorTest {
     compactionConfigs = getCompactionConfigs("60", "10");
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
-            validDocIdsMetadataInfo);
+            validDocIdsMetadataInfo, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
     assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), 
_completedSegment2.getSegmentName());
@@ -258,7 +260,7 @@ public class UpsertCompactionTaskGeneratorTest {
     compactionConfigs = getCompactionConfigs("0", "10");
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
-            validDocIdsMetadataInfo);
+            validDocIdsMetadataInfo, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -269,7 +271,7 @@ public class UpsertCompactionTaskGeneratorTest {
     compactionConfigs = getCompactionConfigs("30", "0");
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
-            validDocIdsMetadataInfo);
+            validDocIdsMetadataInfo, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -288,7 +290,7 @@ public class UpsertCompactionTaskGeneratorTest {
     });
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
-            validDocIdsMetadataInfo);
+            validDocIdsMetadataInfo, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
 
     // completedSegment is supposed to be filtered out
     
Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 
0);
@@ -313,7 +315,7 @@ public class UpsertCompactionTaskGeneratorTest {
     compactionConfigs = getCompactionConfigs("30", "0");
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
-            validDocIdsMetadataInfo);
+            validDocIdsMetadataInfo, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
     
Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 
2);
     
Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 0);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -326,6 +328,99 @@ public class UpsertCompactionTaskGeneratorTest {
     
assertEquals(validDocIdsMetadataInfo.get("testTable__1").get(0).getSegmentCreationTimeMillis(),
 9876543210L);
   }
 
+  @Test
+  public void testProcessValidDocIdsMetadataConsensus() {
+    Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
+    String segmentName = _completedSegment.getSegmentName();
+    long crc = _completedSegment.getCrc();
+    Map<String, Integer> twoReplicas = Map.of(segmentName, 2);
+
+    Map<String, List<ValidDocIdsMetadataInfo>> equalReplicas = 
Map.of(segmentName, List.of(
+        meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, 
"server2")));
+    UpsertCompactionTaskGenerator.SegmentSelectionResult result =
+        
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+            equalReplicas, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertEquals(result.getSegmentsForCompaction().size(), 1);
+
+    Map<String, List<ValidDocIdsMetadataInfo>> unequalReplicas = 
Map.of(segmentName, List.of(
+        meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 60, 40, 100, crc, ServiceStatus.Status.GOOD, 
"server2")));
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+        unequalReplicas, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertTrue(result.getSegmentsForCompaction().isEmpty());
+    assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+    Map<String, List<ValidDocIdsMetadataInfo>> oneResponded = 
Map.of(segmentName, List.of(
+        meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, 
"server1")));
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+        oneResponded, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+    Map<String, List<ValidDocIdsMetadataInfo>> crcMismatch = 
Map.of(segmentName, List.of(
+        meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 50, 50, 100, crc + 1, ServiceStatus.Status.GOOD, 
"server2")));
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+        crcMismatch, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+    Map<String, List<ValidDocIdsMetadataInfo>> unhealthy = Map.of(segmentName, 
List.of(
+        meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.STARTING, 
"server2")));
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+        unhealthy, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+        crcMismatch, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+    assertEquals(result.getSegmentsForCompaction().size(), 1);
+
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+        crcMismatch, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+    assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+    Map<String, List<ValidDocIdsMetadataInfo>> mostValidDocs = 
Map.of(segmentName, List.of(
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, 
"server2")));
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
+        mostValidDocs, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+    assertTrue(result.getSegmentsForCompaction().isEmpty());
+    assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+    SegmentZKMetadata dataCrcSegment = new SegmentZKMetadata(segmentName);
+    dataCrcSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    dataCrcSegment.setTotalDocs(100L);
+    dataCrcSegment.setCrc(1000);
+    dataCrcSegment.setUseDataCrc(true);
+    dataCrcSegment.setDataCrc(5000);
+    Map<String, SegmentZKMetadata> dataCrcMap = Map.of(segmentName, 
dataCrcSegment);
+    Map<String, List<ValidDocIdsMetadataInfo>> dataCrcMatch = 
Map.of(segmentName, List.of(
+        metaWithDataCrc(segmentName, 50, 50, 100, 2000, "5000", 
ServiceStatus.Status.GOOD, "server1"),
+        metaWithDataCrc(segmentName, 50, 50, 100, 2000, "5000", 
ServiceStatus.Status.GOOD, "server2")));
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
dataCrcMap, dataCrcMatch,
+        twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertEquals(result.getSegmentsForCompaction().size(), 1);
+
+    Map<String, List<ValidDocIdsMetadataInfo>> dataCrcMismatch = 
Map.of(segmentName, List.of(
+        metaWithDataCrc(segmentName, 50, 50, 100, 2000, "9999", 
ServiceStatus.Status.GOOD, "server1"),
+        metaWithDataCrc(segmentName, 50, 50, 100, 2000, "9999", 
ServiceStatus.Status.GOOD, "server2")));
+    result = 
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
dataCrcMap, dataCrcMismatch,
+        twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertTrue(result.getSegmentsForCompaction().isEmpty());
+  }
+
+  private static ValidDocIdsMetadataInfo meta(String segmentName, long 
validDocs, long invalidDocs, long totalDocs,
+      long crc, ServiceStatus.Status serverStatus, String instanceId) {
+    return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs, 
totalDocs, String.valueOf(crc), null,
+        ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), 
instanceId, serverStatus);
+  }
+
+  private static ValidDocIdsMetadataInfo metaWithDataCrc(String segmentName, 
long validDocs, long invalidDocs,
+      long totalDocs, long crc, String dataCrc, ServiceStatus.Status 
serverStatus, String instanceId) {
+    return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs, 
totalDocs, String.valueOf(crc), dataCrc,
+        ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), 
instanceId, serverStatus);
+  }
+
   @Test
   public void testUpsertCompactionTaskConfig() {
     Map<String, String> upsertCompactionTaskConfig =
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index 1bc8ad96876..60104b47286 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -363,11 +363,11 @@ public class UpsertCompactMergeTaskGeneratorTest {
 
     Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadata = new 
HashMap<>();
     validDocIdsMetadata.put("testTable__0__0__12345", Arrays.asList(
-        new ValidDocIdsMetadataInfo("testTable__0__0__12345", 90, 10, 100, 
"1000",
+        new ValidDocIdsMetadataInfo("testTable__0__0__12345", 90, 10, 100, 
"1000", null,
             ValidDocIdsType.SNAPSHOT, 100000, System.currentTimeMillis(), 
"server1",
             ServiceStatus.Status.GOOD)));
     validDocIdsMetadata.put("testTable__0__1__12346", Arrays.asList(
-        new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10, "2000",
+        new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10, 
"2000", null,
             ValidDocIdsType.SNAPSHOT, 10000, System.currentTimeMillis(), 
"server1",
             ServiceStatus.Status.GOOD)));
 
@@ -375,7 +375,7 @@ public class UpsertCompactMergeTaskGeneratorTest {
 
     SegmentSelectionResult result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
         RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
-        validDocIdsMetadata, alreadyMergedSegments, null);
+        validDocIdsMetadata, alreadyMergedSegments, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE, null);
 
     Assert.assertNotNull(result);
     Assert.assertNotNull(result.getSegmentsForCompactMergeByPartition());
@@ -401,11 +401,11 @@ public class UpsertCompactMergeTaskGeneratorTest {
     Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadata = new 
HashMap<>();
     // Segment with 0 valid docs - should be marked for deletion
     validDocIdsMetadata.put("testTable__0__0__12345", Arrays.asList(
-        new ValidDocIdsMetadataInfo("testTable__0__0__12345", 0, 100, 100, 
"1000",
+        new ValidDocIdsMetadataInfo("testTable__0__0__12345", 0, 100, 100, 
"1000", null,
             ValidDocIdsType.SNAPSHOT, 100000, System.currentTimeMillis(), 
"server1",
             ServiceStatus.Status.GOOD)));
     validDocIdsMetadata.put("testTable__0__1__12346", Arrays.asList(
-        new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10, "2000",
+        new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10, 
"2000", null,
             ValidDocIdsType.SNAPSHOT, 10000, System.currentTimeMillis(), 
"server1",
             ServiceStatus.Status.GOOD)));
 
@@ -413,13 +413,83 @@ public class UpsertCompactMergeTaskGeneratorTest {
 
     SegmentSelectionResult result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
         RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
-        validDocIdsMetadata, alreadyMergedSegments, null);
+        validDocIdsMetadata, alreadyMergedSegments, Map.of(), 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE, null);
 
     Assert.assertNotNull(result);
     Assert.assertEquals(result.getSegmentsForDeletion().size(), 1, "Should 
have one segment for deletion");
     
Assert.assertTrue(result.getSegmentsForDeletion().contains("testTable__0__0__12345"));
   }
 
+  /**
+   * Tests that replica consensus is enforced before a segment is selected. 
Uses the deletion path (a fully-invalid
+   * segment) as a clean signal: the segment is processed only when its 
replicas pass the consensus check.
+   */
+  @Test
+  public void testProcessValidDocIdsMetadataConsensus() {
+    Map<String, String> taskConfigs = new HashMap<>();
+    String segmentName = _completedSegment.getSegmentName();
+    long crc = _completedSegment.getCrc();
+    Map<String, SegmentZKMetadata> candidateSegmentsMap = Map.of(segmentName, 
_completedSegment);
+    Set<String> noMerged = Set.of();
+    Map<String, Integer> twoReplicas = Map.of(segmentName, 2);
+
+    Map<String, List<ValidDocIdsMetadataInfo>> agree = Map.of(segmentName, 
List.of(
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server2")));
+    SegmentSelectionResult result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
+        taskConfigs, candidateSegmentsMap, agree, noMerged, twoReplicas,
+        MinionConstants.ValidDocIdsConsensusMode.EQUAL, null);
+    Assert.assertTrue(result.getSegmentsForDeletion().contains(segmentName));
+
+    Map<String, List<ValidDocIdsMetadataInfo>> disagree = Map.of(segmentName, 
List.of(
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 1, 99, 100, crc, ServiceStatus.Status.GOOD, 
"server2")));
+    result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, 
taskConfigs,
+        candidateSegmentsMap, disagree, noMerged, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL, null);
+    Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+    Map<String, List<ValidDocIdsMetadataInfo>> crcMismatch = 
Map.of(segmentName, List.of(
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 0, 100, 100, crc + 1, ServiceStatus.Status.GOOD, 
"server2")));
+    result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, 
taskConfigs,
+        candidateSegmentsMap, crcMismatch, noMerged, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL,
+        null);
+    Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+    Map<String, List<ValidDocIdsMetadataInfo>> unhealthy = Map.of(segmentName, 
List.of(
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.STARTING, 
"server2")));
+    result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, 
taskConfigs,
+        candidateSegmentsMap, unhealthy, noMerged, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL, null);
+    Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+    result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, 
taskConfigs,
+        candidateSegmentsMap, crcMismatch, noMerged, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.UNSAFE,
+        null);
+    Assert.assertTrue(result.getSegmentsForDeletion().contains(segmentName));
+
+    Map<String, List<ValidDocIdsMetadataInfo>> mostValidDocs = 
Map.of(segmentName, List.of(
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server1"),
+        meta(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, 
"server2")));
+    result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, 
taskConfigs,
+        candidateSegmentsMap, mostValidDocs, noMerged, twoReplicas,
+        MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS, null);
+    Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+    Map<String, List<ValidDocIdsMetadataInfo>> oneResponded = 
Map.of(segmentName, List.of(
+        meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, 
"server1")));
+    result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, 
taskConfigs,
+        candidateSegmentsMap, oneResponded, noMerged, twoReplicas, 
MinionConstants.ValidDocIdsConsensusMode.EQUAL,
+        null);
+    Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+  }
+
+  private static ValidDocIdsMetadataInfo meta(String segmentName, long 
validDocs, long invalidDocs, long totalDocs,
+      long crc, ServiceStatus.Status serverStatus, String instanceId) {
+    return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs, 
totalDocs, String.valueOf(crc), null,
+        ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), 
instanceId, serverStatus);
+  }
+
   /**
    * Tests getCandidateSegments with various edge cases.
    */
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 013e4a3588a..5f139668ea8 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -573,8 +573,8 @@ public class TablesResource {
       }
       byte[] validDocIdsBytes = 
RoaringBitmapUtils.serialize(validDocIdSnapshot);
       return new ValidDocIdsBitmapResponse(segmentName, 
indexSegment.getSegmentMetadata().getCrc(),
-          finalValidDocIdsType, validDocIdsBytes, 
_serverInstance.getInstanceDataManager().getInstanceId(),
-          status);
+          toReportableDataCrc(indexSegment.getSegmentMetadata().getDataCrc()), 
finalValidDocIdsType, validDocIdsBytes,
+          _serverInstance.getInstanceDataManager().getInstanceId(), status);
     } finally {
       tableDataManager.releaseSegment(segmentDataManager);
     }
@@ -747,6 +747,10 @@ public class TablesResource {
         validDocIdsMetadata.put("totalValidDocs", totalValidDocs);
         validDocIdsMetadata.put("totalInvalidDocs", totalInvalidDocs);
         validDocIdsMetadata.put("segmentCrc", 
indexSegment.getSegmentMetadata().getCrc());
+        String reportableDataCrc = 
toReportableDataCrc(indexSegment.getSegmentMetadata().getDataCrc());
+        if (reportableDataCrc != null) {
+          validDocIdsMetadata.put("segmentDataCrc", reportableDataCrc);
+        }
         validDocIdsMetadata.put("validDocIdsType", finalValidDocIdsType);
         validDocIdsMetadata.put("serverStatus", status);
         validDocIdsMetadata.put("instanceId", 
_serverInstance.getInstanceDataManager().getInstanceId());
@@ -774,6 +778,12 @@ public class TablesResource {
     }
   }
 
+  /// The segment's data CRC to report, or null when unavailable (negative).
+  @Nullable
+  private static String toReportableDataCrc(String dataCrc) {
+    return dataCrc != null && Long.parseLong(dataCrc) >= 0 ? dataCrc : null;
+  }
+
   private Pair<ValidDocIdsType, MutableRoaringBitmap> 
getValidDocIds(IndexSegment indexSegment,
       String validDocIdsTypeStr) {
     if (validDocIdsTypeStr == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to