This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f0329f33762 Enforce validDocIds consensus in upsert task generators
and add includeBitmaps to validDocIdsMetadata API (#18853)
f0329f33762 is described below
commit f0329f33762de8afbe1b2901952c7129d9ca7a85
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Tue Jun 30 17:10:06 2026 -0700
Enforce validDocIds consensus in upsert task generators and add
includeBitmaps to validDocIdsMetadata API (#18853)
---
.../resources/ValidDocIdsBitmapResponse.java | 22 ++-
.../restlet/resources/ValidDocIdsMetadataInfo.java | 19 ++-
.../util/ServerSegmentMetadataReader.java | 37 ++++-
.../apache/pinot/core/common/MinionConstants.java | 24 ++--
.../pinot/plugin/minion/tasks/MinionTaskUtils.java | 155 ++++++++++++++++++++-
.../UpsertCompactionTaskExecutor.java | 10 +-
.../UpsertCompactionTaskGenerator.java | 81 +++++------
.../UpsertCompactMergeTaskExecutor.java | 2 +-
.../UpsertCompactMergeTaskGenerator.java | 83 ++++++-----
.../plugin/minion/tasks/MinionTaskUtilsTest.java | 83 ++++++++++-
.../UpsertCompactionTaskGeneratorTest.java | 109 ++++++++++++++-
.../UpsertCompactMergeTaskGeneratorTest.java | 82 ++++++++++-
.../pinot/server/api/resources/TablesResource.java | 14 +-
13 files changed, 600 insertions(+), 121 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
index 2cb8d661318..d925c69d072 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
@@ -18,24 +18,35 @@
*/
package org.apache.pinot.common.restlet.resources;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
import org.apache.pinot.common.utils.ServiceStatus;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class ValidDocIdsBitmapResponse {
private final String _segmentName;
private final String _segmentCrc;
+ // Server's data CRC (forward index + dictionary checksum); null when the
server doesn't report it.
+ @Nullable
+ private final String _segmentDataCrc;
private final ValidDocIdsType _validDocIdsType;
private final byte[] _bitmap;
private final String _instanceId;
private final ServiceStatus.Status _serverStatus;
+ @JsonCreator
public ValidDocIdsBitmapResponse(@JsonProperty("segmentName") String
segmentName,
- @JsonProperty("segmentCrc") String crc, @JsonProperty("validDocIdsType")
ValidDocIdsType validDocIdsType,
- @JsonProperty("bitmap") byte[] bitmap, @JsonProperty("instanceId")
String instanceId,
+ @JsonProperty("segmentCrc") String crc, @JsonProperty("segmentDataCrc")
@Nullable String segmentDataCrc,
+ @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
@JsonProperty("bitmap") byte[] bitmap,
+ @JsonProperty("instanceId") String instanceId,
@JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
_segmentName = segmentName;
_segmentCrc = crc;
+ _segmentDataCrc = segmentDataCrc;
_validDocIdsType = validDocIdsType;
_bitmap = bitmap;
_instanceId = instanceId;
@@ -50,6 +61,13 @@ public class ValidDocIdsBitmapResponse {
return _segmentCrc;
}
+ /// Server's data CRC, or null if not reported. Omitted from the payload
when null.
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getSegmentDataCrc() {
+ return _segmentDataCrc;
+ }
+
public ValidDocIdsType getValidDocIdsType() {
return _validDocIdsType;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
index 6fd42c86f65..185d8b053bc 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
@@ -18,8 +18,11 @@
*/
package org.apache.pinot.common.restlet.resources;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
import org.apache.pinot.common.utils.ServiceStatus;
@@ -30,24 +33,31 @@ public class ValidDocIdsMetadataInfo {
private final long _totalInvalidDocs;
private final long _totalDocs;
private final String _segmentCrc;
+ // Server's data CRC (forward index + dictionary checksum); null when the
server doesn't report it.
+ @Nullable
+ private final String _segmentDataCrc;
private final ValidDocIdsType _validDocIdsType;
private final long _segmentSizeInBytes;
private final long _segmentCreationTimeMillis;
private final String _instanceId;
private final ServiceStatus.Status _serverStatus;
+ @JsonCreator
public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String
segmentName,
@JsonProperty("totalValidDocs") long totalValidDocs,
@JsonProperty("totalInvalidDocs") long totalInvalidDocs,
@JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc")
String segmentCrc,
+ @JsonProperty("segmentDataCrc") @Nullable String segmentDataCrc,
@JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
@JsonProperty("segmentSizeInBytes") long segmentSizeInBytes,
@JsonProperty("segmentCreationTimeMillis") long
segmentCreationTimeMillis,
- @JsonProperty("instanceId") String instanceId,
@JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
+ @JsonProperty("instanceId") String instanceId,
+ @JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
_segmentName = segmentName;
_totalValidDocs = totalValidDocs;
_totalInvalidDocs = totalInvalidDocs;
_totalDocs = totalDocs;
_segmentCrc = segmentCrc;
+ _segmentDataCrc = segmentDataCrc;
_validDocIdsType = validDocIdsType;
_segmentSizeInBytes = segmentSizeInBytes;
_segmentCreationTimeMillis = segmentCreationTimeMillis;
@@ -75,6 +85,13 @@ public class ValidDocIdsMetadataInfo {
return _segmentCrc;
}
+ /// Server's data CRC, or null if not reported. Omitted from the payload
when null.
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getSegmentDataCrc() {
+ return _segmentDataCrc;
+ }
+
public ValidDocIdsType getValidDocIdsType() {
return _validDocIdsType;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index a1669a2882b..a079f6af06e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -267,8 +267,9 @@ public class ServerSegmentMetadataReader {
@Nullable List<String> segmentNames, int timeoutMs, String
validDocIdsType,
int numSegmentsBatchPerServerRequest) {
return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegmentsMap, serverToEndpoints,
- segmentNames, timeoutMs, validDocIdsType,
numSegmentsBatchPerServerRequest).values().stream()
- .filter(list -> list != null && !list.isEmpty()).map(list ->
list.get(0)).collect(Collectors.toList());
+ segmentNames, timeoutMs, validDocIdsType,
numSegmentsBatchPerServerRequest).getSegmentToMetadata().values()
+ .stream().filter(list -> list != null && !list.isEmpty()).map(list ->
list.get(0))
+ .collect(Collectors.toList());
}
/**
@@ -276,13 +277,14 @@ public class ServerSegmentMetadataReader {
* This method will pick all servers that hosts the target segment and fetch
the segment metadata result and
* return as a list.
*
- * @return map of segment name to list of valid doc id metadata where each
element is every server's metadata.
+ * @return the per-segment metadata from every responding server, plus the
expected replica count per segment.
*/
- public Map<String, List<ValidDocIdsMetadataInfo>>
getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
+ public ValidDocIdsMetadataResult
getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String>
serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String
validDocIdsType,
int numSegmentsBatchPerServerRequest) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+ Map<String, Integer> segmentToExpectedReplicaCount = new HashMap<>();
for (Map.Entry<String, List<String>> serverToSegments :
serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
List<String> segmentsToQuery = new ArrayList<>();
@@ -296,6 +298,9 @@ public class ServerSegmentMetadataReader {
}
}
}
+ for (String segment : segmentsToQuery) {
+ segmentToExpectedReplicaCount.merge(segment, 1, Integer::sum);
+ }
// Number of segments to query per server request. If a table has a lot
of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will
help in reducing the payload size.
@@ -352,7 +357,29 @@ public class ServerSegmentMetadataReader {
LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server
requests.",
validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
- return validDocIdsMetadataInfos;
+ return new ValidDocIdsMetadataResult(validDocIdsMetadataInfos,
segmentToExpectedReplicaCount);
+ }
+
+ /// Result of [#getSegmentToValidDocIdsMetadataFromServer]: the per-segment
metadata from every responding server,
+ /// plus the expected replica count per segment (how many servers host it).
The count lets callers detect a replica
+ /// that did not respond, since a missing replica is simply absent from the
metadata.
+ public static class ValidDocIdsMetadataResult {
+ private final Map<String, List<ValidDocIdsMetadataInfo>>
_segmentToMetadata;
+ private final Map<String, Integer> _segmentToExpectedReplicaCount;
+
+ public ValidDocIdsMetadataResult(Map<String,
List<ValidDocIdsMetadataInfo>> segmentToMetadata,
+ Map<String, Integer> segmentToExpectedReplicaCount) {
+ _segmentToMetadata = segmentToMetadata;
+ _segmentToExpectedReplicaCount = segmentToExpectedReplicaCount;
+ }
+
+ public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToMetadata() {
+ return _segmentToMetadata;
+ }
+
+ public Map<String, Integer> getSegmentToExpectedReplicaCount() {
+ return _segmentToExpectedReplicaCount;
+ }
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 976216d06a9..007fe183738 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,11 +92,18 @@ public class MinionConstants {
*/
public static final String SEGMENT_DOWNLOAD_PARALLELISM =
"segmentDownloadParallelism";
- /** Valid doc ids consensus mode (executor-only). Kept internal; executors
pass config string. */
+ /// Valid doc ids consensus mode enforced by both the task generators
(pre-scheduling) and the executors.
public enum ValidDocIdsConsensusMode {
UNSAFE, EQUAL, MOST_VALID_DOCS
}
+ /// Where validDocIds consensus is enforced. STRICT (default) runs the
checks in both the task generator
+ /// (pre-scheduling) and the executor; EXECUTOR_ONLY skips the
generator-side checks and leaves the executor as the
+ /// sole gate.
+ public enum ValidDocIdsValidationMode {
+ STRICT, EXECUTOR_ONLY
+ }
+
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
@@ -268,15 +275,16 @@ public class MinionConstants {
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST =
"numSegmentsBatchPerServerRequest";
- /**
- * Valid doc ids consensus mode used by the executor only (generator
unchanged). Values: UNSAFE, EQUAL,
- * MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY;
EQUAL = require all replicas
- * to have the same valid doc set (default); MOST_VALID_DOCS = use replica
with most valid docs.
- */
+ /// Valid doc ids consensus mode used by both the task generators
(pre-scheduling) and the executors. UNSAFE =
+ /// first server with matching CRC and GOOD status; EQUAL (default) = all
replicas must agree; MOST_VALID_DOCS =
+ /// the replica with the most valid docs. Shared by UpsertCompactionTask
and UpsertCompactMergeTask.
public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY =
"validDocIdsConsensusMode";
-
- /** Default: equal valid doc set consensus across replicas. */
public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";
+
+ /// Whether the consensus checks also run in the generator. STRICT
(default) = generator + executor;
+ /// EXECUTOR_ONLY = executor only. Shared by UpsertCompactionTask and
UpsertCompactMergeTask.
+ public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY =
"validDocIdsValidationMode";
+ public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE =
"STRICT";
}
public static class UpsertCompactMergeTask {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index ea2150644c3..3667d731570 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.minion.tasks;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.net.URI;
import java.text.SimpleDateFormat;
@@ -30,6 +31,7 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
@@ -38,6 +40,7 @@ import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.auth.NullAuthProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RetentionUtils;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
@@ -72,14 +75,32 @@ import org.slf4j.LoggerFactory;
public class MinionTaskUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(MinionTaskUtils.class);
- /** Package-private for testing: parses validDocIdsComparisonMode config
string. */
- static MinionConstants.ValidDocIdsConsensusMode
parseValidDocIdsConsensusMode(String value) {
- if (value == null || value.isBlank()) {
+ /// Parses the validDocIdsConsensusMode config string. Blank/null defaults
to `EQUAL`.
+ public static MinionConstants.ValidDocIdsConsensusMode
parseValidDocIdsConsensusMode(String value) {
+ if (StringUtils.isBlank(value)) {
return MinionConstants.ValidDocIdsConsensusMode.EQUAL;
}
return
MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim());
}
+ /// Parses the validDocIdsValidationMode config string. Blank/null defaults
to `STRICT`.
+ public static MinionConstants.ValidDocIdsValidationMode
parseValidDocIdsValidationMode(String value) {
+ if (StringUtils.isBlank(value)) {
+ return MinionConstants.ValidDocIdsValidationMode.STRICT;
+ }
+ return
MinionConstants.ValidDocIdsValidationMode.valueOf(value.toUpperCase().trim());
+ }
+
+ /// Resolves the consensus mode the generator should apply, given the
configured consensus mode and validation
+ /// mode. EXECUTOR_ONLY downgrades the generator to UNSAFE (lenient pick, no
bitmaps, no cross-replica enforcement)
+ /// so the executor stays the sole gate; STRICT keeps the configured mode.
+ public static MinionConstants.ValidDocIdsConsensusMode
resolveGeneratorConsensusMode(
+ MinionConstants.ValidDocIdsConsensusMode consensusMode,
+ MinionConstants.ValidDocIdsValidationMode validationMode) {
+ return validationMode ==
MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY
+ ? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode;
+ }
+
private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";
public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
@@ -324,6 +345,14 @@ public class MinionTaskUtils {
@Nullable
public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String
tableNameWithType, String segmentName,
String validDocIdsType, MinionContext minionContext, String expectedCrc,
String comparisonModeStr) {
+ return getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName,
validDocIdsType, minionContext,
+ expectedCrc, null, comparisonModeStr);
+ }
+
+ /// Variant that also matches on the expected data CRC; see [#crcMatches].
+ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String
tableNameWithType, String segmentName,
+ String validDocIdsType, MinionContext minionContext, String expectedCrc,
@Nullable String expectedDataCrc,
+ String comparisonModeStr) {
MinionConstants.ValidDocIdsConsensusMode consensusMode =
parseValidDocIdsConsensusMode(comparisonModeStr);
String clusterName = minionContext.getHelixManager().getClusterName();
HelixAdmin helixAdmin =
minionContext.getHelixManager().getClusterManagmentTool();
@@ -352,13 +381,15 @@ public class MinionTaskUtils {
}
String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
+ long serverDataCrc =
parseCrc(validDocIdsBitmapResponse.getSegmentDataCrc());
// Check crc from the downloaded segment against the crc returned from
the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the
race condition where the segment has been
// uploaded successfully while the server is still reloading the
segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all
primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks
for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't
need to check that here.
- if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
+ if (!crcMatches(parseCrc(expectedCrc), parseCrc(expectedDataCrc),
parseCrc(crcFromValidDocIdsBitmap),
+ serverDataCrc)) {
if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
LOGGER.warn("CRC mismatch for segment: {} from endpoint {},
skipping", segmentName, endpoint);
continue;
@@ -428,6 +459,122 @@ public class MinionTaskUtils {
return maxCardinalityMap;
}
+ /// Picks the replica whose validDocIds the generator should use, or null to
skip the segment, applying the same
+ /// checks the executor would so bad segments aren't scheduled: the replica
must match CRC (via [#crcMatches]) and
+ /// be on a healthy server. UNSAFE uses the first such replica, EQUAL
requires all to report the same valid doc
+ /// count, and MOST_VALID_DOCS picks the highest.
+ @Nullable
+ public static ValidDocIdsMetadataInfo
selectValidDocIdsMetadataForConsensus(String taskType,
+ SegmentZKMetadata segmentZKMetadata, @Nullable
List<ValidDocIdsMetadataInfo> replicas, int expectedReplicaCount,
+ MinionConstants.ValidDocIdsConsensusMode consensusMode) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ if (CollectionUtils.isEmpty(replicas)) {
+ return null;
+ }
+ boolean unsafe = consensusMode ==
MinionConstants.ValidDocIdsConsensusMode.UNSAFE;
+ List<ValidDocIdsMetadataInfo> usableReplicas = new ArrayList<>();
+ for (ValidDocIdsMetadataInfo replica : replicas) {
+ // A CRC mismatch usually means the server is still reloading the
segment, so its valid doc set can't be
+ // trusted. UNSAFE skips just this replica; stricter modes skip the
whole segment.
+ long replicaCrc = parseCrc(replica.getSegmentCrc());
+ if (replicaCrc < 0) {
+ LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {},
skipping {} (mode={}) for {}",
+ replica.getSegmentCrc(), segmentName, replica.getInstanceId(),
unsafe ? "replica" : "segment",
+ consensusMode, taskType);
+ if (unsafe) {
+ continue;
+ }
+ return null;
+ }
+ // -1 when this table doesn't use data CRC, so crcMatches falls back to
the segment CRC.
+ long zkDataCrc = segmentZKMetadata.isUseDataCrc() ?
segmentZKMetadata.getDataCrc() : -1;
+ if (!crcMatches(segmentZKMetadata.getCrc(), zkDataCrc, replicaCrc,
parseCrc(replica.getSegmentDataCrc()))) {
+ LOGGER.warn("CRC mismatch for segment: {} (zkCrc={}, zkDataCrc={},
server={} reported crc={} dataCrc={}), "
+ + "skipping {} (mode={}) for {}", segmentName,
segmentZKMetadata.getCrc(),
+ segmentZKMetadata.getDataCrc(), replica.getInstanceId(),
replicaCrc, replica.getSegmentDataCrc(),
+ unsafe ? "replica" : "segment", consensusMode, taskType);
+ if (unsafe) {
+ continue;
+ }
+ return null;
+ }
+ // A non-GOOD server may still be mutating the segment, so its valid doc
set is unreliable.
+ if (replica.getServerStatus() != null && replica.getServerStatus() !=
ServiceStatus.Status.GOOD) {
+ LOGGER.warn("Server {} is in {} state for segment: {}, skipping {}
(mode={}) for {}", replica.getInstanceId(),
+ replica.getServerStatus(), segmentName, unsafe ? "replica" :
"segment", consensusMode, taskType);
+ if (unsafe) {
+ continue;
+ }
+ return null;
+ }
+ if (unsafe) {
+ return replica;
+ }
+ usableReplicas.add(replica);
+ }
+
+ if (usableReplicas.isEmpty()) {
+ return null;
+ }
+
+ // Strict modes need every assigned replica to have responded; a short
responder list means a server dropped out
+ // (network error, parse failure, or it no longer has the segment), so we
can't confirm consensus across the full
+ // replica set and skip the segment.
+ if (usableReplicas.size() < expectedReplicaCount) {
+ LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot
confirm consensus, skipping for {}",
+ usableReplicas.size(), expectedReplicaCount, segmentName, taskType);
+ return null;
+ }
+
+ if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
+ // Require every replica to report the same valid doc count. Comparing
counts (rather than the full bitmaps)
+ // keeps the generator cheap - it avoids serializing a bitmap per
replica back to the controller.
+ ValidDocIdsMetadataInfo first = usableReplicas.get(0);
+ for (int i = 1; i < usableReplicas.size(); i++) {
+ if (usableReplicas.get(i).getTotalValidDocs() !=
first.getTotalValidDocs()) {
+ LOGGER.warn("Replicas disagree on valid doc count for segment: {},
skipping segment for {}", segmentName,
+ taskType);
+ return null;
+ }
+ }
+ return first;
+ }
+
+ // MOST_VALID_DOCS: pick the replica reporting the most valid docs.
+ ValidDocIdsMetadataInfo chosen = usableReplicas.get(0);
+ for (ValidDocIdsMetadataInfo replica : usableReplicas) {
+ if (replica.getTotalValidDocs() > chosen.getTotalValidDocs()) {
+ chosen = replica;
+ }
+ }
+ return chosen;
+ }
+
+ /// Whether two segment copies hold the same data. Matches on the full
segment CRC, or - when those differ - on the
+ /// data CRC (a checksum over only the forward index and dictionary, so
index/metadata-only changes don't affect
+ /// it) when both copies report one (`>= 0`). A negative data CRC means "not
reported". Mirrors the logic of
+ /// `BaseTableDataManager.hasSameCRC` and is used by both the generator's
pre-scheduling check and the executor's
+ /// per-server check.
+ @VisibleForTesting
+ static boolean crcMatches(long segmentCrc, long dataCrc, long
otherSegmentCrc, long otherDataCrc) {
+ if (segmentCrc == otherSegmentCrc) {
+ return true;
+ }
+ return dataCrc >= 0 && otherDataCrc >= 0 && dataCrc == otherDataCrc;
+ }
+
+ /// Parses a CRC string, returning `-1` ("unavailable") when it is null or
unparseable.
+ private static long parseCrc(@Nullable String crc) {
+ if (crc == null) {
+ return -1;
+ }
+ try {
+ return Long.parseLong(crc);
+ } catch (NumberFormatException e) {
+ return -1;
+ }
+ }
+
public static String toUTCString(long epochMillis) {
Date date = new Date(epochMillis);
SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index ce97909e58c..2d04d1edaec 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -78,13 +78,13 @@ public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExe
// Executor-only: read comparison mode string from task config (no auth
resolution or URL hits).
Map<String, String> taskConfigs =
tableConfig.getTaskConfig() != null ?
tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
- String consensusMode =
- taskConfigs != null ?
taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
- UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
- : UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
+ String consensusMode = taskConfigs != null
+ ?
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
+ :
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType,
segmentName, validDocIdsTypeStr,
- MINION_CONTEXT, originalSegmentCrcFromTaskGenerator,
consensusMode);
+ MINION_CONTEXT, originalSegmentCrcFromTaskGenerator,
segmentMetadata.getDataCrc(), consensusMode);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track
it via task-error metrics
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 02e15b9d328..b9c077a1c7e 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -34,7 +34,6 @@ import
org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
-import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
@@ -146,21 +145,34 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
ValidDocIdsType validDocIdsType =
MinionTaskUtils.getValidDocIdsType(tableConfig.getUpsertConfig(), taskConfigs,
UpsertCompactionTask.VALID_DOC_IDS_TYPE);
+ // Validate replicas before scheduling, matching the executor's checks,
so inconsistent segments are never
+ // scheduled. With EXECUTOR_ONLY the generator skips these checks (the
executor stays the gate).
+ MinionConstants.ValidDocIdsConsensusMode consensusMode =
MinionTaskUtils.resolveGeneratorConsensusMode(
+ MinionTaskUtils.parseValidDocIdsConsensusMode(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)),
+ MinionTaskUtils.parseValidDocIdsValidationMode(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY,
+
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE)));
+
// Number of segments to query per server request. If a table has a lot
of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will
help in reducing the payload size.
int numSegmentsBatchPerServerRequest = Integer.parseInt(
taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
- Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+ ServerSegmentMetadataReader.ValidDocIdsMetadataResult
validDocIdsMetadataResult =
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegments,
serverToEndpoints, null, 60_000, validDocIdsType.toString(),
numSegmentsBatchPerServerRequest);
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+ validDocIdsMetadataResult.getSegmentToMetadata();
Map<String, SegmentZKMetadata> completedSegmentsMap =
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
Function.identity()));
SegmentSelectionResult segmentSelectionResult =
- processValidDocIdsMetadata(taskConfigs, completedSegmentsMap,
validDocIdsMetadataList);
+ processValidDocIdsMetadata(taskConfigs, completedSegmentsMap,
validDocIdsMetadataList,
+ validDocIdsMetadataResult.getSegmentToExpectedReplicaCount(),
consensusMode);
int skippedSegmentsCount = validDocIdsMetadataList.size()
- segmentSelectionResult.getSegmentsForCompaction().size()
- segmentSelectionResult.getSegmentsForDeletion().size();
@@ -207,7 +219,8 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String,
String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap,
- Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
+ Map<String, Integer> segmentToReplicaCount,
MinionConstants.ValidDocIdsConsensusMode consensusMode) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
@@ -223,43 +236,33 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
continue;
}
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
- for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoMap.get(segmentName)) {
- long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
-
- // Skip segments if the crc from zk metadata and server does not
match. They may be being reloaded.
- if (segment.getCrc() !=
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
- LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={},
validDocIdsMetadata={})", segmentName,
- segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
- continue;
- }
- // skipping segments for which their servers are not in READY state.
The bitmaps would be inconsistent when
- // server is NOT READY as UPDATING segments might be updating the
ONLINE segments
- if (validDocIdsMetadata.getServerStatus() != null &&
!validDocIdsMetadata.getServerStatus()
- .equals(ServiceStatus.Status.GOOD)) {
- LOGGER.warn("Server {} is in {} state, skipping {} generation for
segment: {}",
- validDocIdsMetadata.getInstanceId(),
validDocIdsMetadata.getServerStatus(),
- MinionConstants.UpsertCompactionTask.TASK_TYPE, segmentName);
- continue;
- }
+ // Validate replicas (CRC match, server health, validDocIds consensus)
before scheduling. Returns null when the
+ // segment should be skipped so we never schedule a task the executor
would later reject.
+ List<ValidDocIdsMetadataInfo> replicas =
validDocIdsMetadataInfoMap.get(segmentName);
+ ValidDocIdsMetadataInfo validDocIdsMetadata =
MinionTaskUtils.selectValidDocIdsMetadataForConsensus(
+ MinionConstants.UpsertCompactionTask.TASK_TYPE, segment, replicas,
+ segmentToReplicaCount.getOrDefault(segmentName, replicas.size()),
consensusMode);
+ if (validDocIdsMetadata == null) {
+ continue;
+ }
- long totalDocs = validDocIdsMetadata.getTotalDocs();
- double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs)
* 100;
- if (totalInvalidDocs == totalDocs) {
- LOGGER.debug("Segment {} contains only invalid records, adding it to
the deletion list", segmentName);
- segmentsForDeletion.add(segment.getSegmentName());
- } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
- && totalInvalidDocs >= invalidRecordsThresholdCount) {
- LOGGER.debug("Segment {} contains {} invalid records out of {} total
records "
- + "(count threshold: {}, percent threshold: {}), adding it
to the compaction list", segmentName,
- totalInvalidDocs, totalDocs, invalidRecordsThresholdCount,
invalidRecordsThresholdPercent);
- segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
- } else {
- LOGGER.debug("Segment {} contains {} invalid records out of {} total
records "
- + "(count threshold: {}, percent threshold: {}), skipping it
for compaction", segmentName,
- totalInvalidDocs, totalDocs, invalidRecordsThresholdCount,
invalidRecordsThresholdPercent);
- }
- break;
+ long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
+ long totalDocs = validDocIdsMetadata.getTotalDocs();
+ double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) *
100;
+ if (totalInvalidDocs == totalDocs) {
+ LOGGER.debug("Segment {} contains only invalid records, adding it to
the deletion list", segmentName);
+ segmentsForDeletion.add(segment.getSegmentName());
+ } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
+ && totalInvalidDocs >= invalidRecordsThresholdCount) {
+ LOGGER.debug("Segment {} contains {} invalid records out of {} total
records "
+ + "(count threshold: {}, percent threshold: {}), adding it to
the compaction list", segmentName,
+ totalInvalidDocs, totalDocs, invalidRecordsThresholdCount,
invalidRecordsThresholdPercent);
+ segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
+ } else {
+ LOGGER.debug("Segment {} contains {} invalid records out of {} total
records "
+ + "(count threshold: {}, percent threshold: {}), skipping it
for compaction", segmentName,
+ totalInvalidDocs, totalDocs, invalidRecordsThresholdCount,
invalidRecordsThresholdPercent);
}
}
segmentsForCompaction.sort((o1, o2) -> {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
index c6847ce321a..bc435c2ec01 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
@@ -118,7 +118,7 @@ public class UpsertCompactMergeTaskExecutor extends
BaseMultipleSegmentsConversi
List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType,
x.getName(),
- ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc(),
consensusMode);
+ ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc(),
x.getDataCrc(), consensusMode);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track
it via task-error metrics
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 1b9565aa24c..45e14405b62 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -178,15 +178,27 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
_clusterInfoAccessor.getConnectionManager());
+ // Reuse the compaction task's consensus-mode and validation-mode config
so both tasks behave the same. With
+ // EXECUTOR_ONLY the generator skips these checks (the executor stays
the gate).
+ MinionConstants.ValidDocIdsConsensusMode consensusMode =
MinionTaskUtils.resolveGeneratorConsensusMode(
+ MinionTaskUtils.parseValidDocIdsConsensusMode(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)),
+ MinionTaskUtils.parseValidDocIdsValidationMode(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY,
+
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE)));
+
// Number of segments to query per server request. If a table has a lot
of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will
help in reducing the payload size.
int numSegmentsBatchPerServerRequest = Integer.parseInt(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
- Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+ ServerSegmentMetadataReader.ValidDocIdsMetadataResult
validDocIdsMetadataResult =
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegments,
serverToEndpoints, null, 60_000,
ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest);
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+ validDocIdsMetadataResult.getSegmentToMetadata();
Map<String, SegmentZKMetadata> candidateSegmentsMap =
candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
Function.identity()));
@@ -195,7 +207,8 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
SegmentSelectionResult segmentSelectionResult =
processValidDocIdsMetadata(tableNameWithType, taskConfigs,
candidateSegmentsMap, validDocIdsMetadataList,
- alreadyMergedSegments,
_clusterInfoAccessor.getControllerMetrics());
+ alreadyMergedSegments,
validDocIdsMetadataResult.getSegmentToExpectedReplicaCount(), consensusMode,
+ _clusterInfoAccessor.getControllerMetrics());
if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
pinotHelixResourceManager.deleteSegments(tableNameWithType,
segmentSelectionResult.getSegmentsForDeletion(),
@@ -249,15 +262,15 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
}
/**
- * Processes validDocIds metadata to determine segments eligible for
deletion or compaction.
- * Evaluates segments based on valid/invalid document counts, server
readiness, and CRC consistency.
- * Requires consensus across all replicas on validDoc counts before
proceeding with any operations.
- * Marks segments with zero valid documents for deletion and groups others
by partition for compaction.
+ * Determines which segments are eligible for deletion or compact-merge. For
each segment, replicas are validated
+ * via {@code consensusMode} (CRC match, server health, validDocIds
agreement); segments that fail are skipped.
+ * Segments with zero valid docs are marked for deletion, the rest are
grouped by partition for compaction.
*/
@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(String
tableNameWithType,
Map<String, String> taskConfigs, Map<String, SegmentZKMetadata>
candidateSegmentsMap,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
Set<String> alreadyMergedSegments,
+ Map<String, Integer> segmentToReplicaCount,
MinionConstants.ValidDocIdsConsensusMode consensusMode,
ControllerMetrics controllerMetrics) {
Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge
= new HashMap<>();
Set<String> segmentsForDeletion = new HashSet<>();
@@ -302,40 +315,38 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
}
SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
- // Process with existing logic using the first replica with matching CRC
(since all have consensus)
- for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoMap.get(segmentName)) {
- long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
- long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
- long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
+ // Validate replicas (CRC match, server health, validDocIds consensus)
before scheduling. Returns null when the
+ // segment should be skipped so we never schedule a task the executor
would later reject.
+ List<ValidDocIdsMetadataInfo> replicas =
validDocIdsMetadataInfoMap.get(segmentName);
+ ValidDocIdsMetadataInfo validDocIdsMetadata =
MinionTaskUtils.selectValidDocIdsMetadataForConsensus(
+ MinionConstants.UpsertCompactMergeTask.TASK_TYPE, segment, replicas,
+ segmentToReplicaCount.getOrDefault(segmentName, replicas.size()),
consensusMode);
+ if (validDocIdsMetadata == null) {
+ continue;
+ }
- // Skip segments if the crc from zk metadata and server does not
match. They may be getting reloaded.
- if (segment.getCrc() !=
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
- LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={},
validDocIdsMetadata={})", segmentName,
- segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
- continue;
- }
+ long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
+ long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
+ long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
+ long totalDocs = validDocIdsMetadata.getTotalDocs();
- // segments eligible for deletion with no valid records
- long totalDocs = validDocIdsMetadata.getTotalDocs();
- if (totalInvalidDocs == totalDocs) {
- segmentsForDeletion.add(segmentName);
- } else if (alreadyMergedSegments.contains(segmentName)) {
- LOGGER.debug("Segment {} already merged. Skipping it for {}",
segmentName,
+ // Segments with no valid records can be deleted outright.
+ if (totalInvalidDocs == totalDocs) {
+ segmentsForDeletion.add(segmentName);
+ } else if (alreadyMergedSegments.contains(segmentName)) {
+ LOGGER.debug("Segment {} already merged. Skipping it for {}",
segmentName,
+ MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+ } else {
+ Integer partitionID =
SegmentUtils.getPartitionIdFromSegmentName(segmentName);
+ if (partitionID == null) {
+ LOGGER.warn("Partition ID not found for segment: {}, skipping it for
{}", segmentName,
MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
- break;
- } else {
- Integer partitionID =
SegmentUtils.getPartitionIdFromSegmentName(segmentName);
- if (partitionID == null) {
- LOGGER.warn("Partition ID not found for segment: {}, skipping it
for {}", segmentName,
- MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
- continue;
- }
- double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes *
totalValidDocs * 1.0) / totalDocs;
- segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k ->
new ArrayList<>())
- .add(new SegmentMergerMetadata(segment, totalValidDocs,
totalInvalidDocs,
- expectedSegmentSizeAfterCompaction));
+ continue;
}
- break;
+ double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes *
totalValidDocs * 1.0) / totalDocs;
+ segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> new
ArrayList<>())
+ .add(new SegmentMergerMetadata(segment, totalValidDocs,
totalInvalidDocs,
+ expectedSegmentSizeAfterCompaction));
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index 0b55986df46..b8518c0dd3d 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -65,6 +65,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
@@ -333,9 +334,68 @@ public class MinionTaskUtilsTest {
() -> MinionTaskUtils.parseValidDocIdsConsensusMode("INVALID_MODE"));
}
- /**
- * Builds a RoaringBitmap with {@code numDocs} valid doc ids (0..numDocs-1).
- */
+ @Test
+ public void testParseValidDocIdsValidationMode() {
+ // Blank/null defaults to STRICT
+ assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(null),
+ MinionConstants.ValidDocIdsValidationMode.STRICT);
+ assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(""),
+ MinionConstants.ValidDocIdsValidationMode.STRICT);
+ assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(" "),
+ MinionConstants.ValidDocIdsValidationMode.STRICT);
+
+ // Case-insensitive parsing
+ assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("STRICT"),
+ MinionConstants.ValidDocIdsValidationMode.STRICT);
+
assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("executor_only"),
+ MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY);
+ assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("
EXECUTOR_ONLY "),
+ MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY);
+
+ expectThrows(IllegalArgumentException.class,
+ () -> MinionTaskUtils.parseValidDocIdsValidationMode("INVALID_MODE"));
+ }
+
+ @Test
+ public void testResolveGeneratorConsensusMode() {
+ // EXECUTOR_ONLY downgrades the generator to UNSAFE regardless of the
configured consensus mode.
+ for (MinionConstants.ValidDocIdsConsensusMode mode :
MinionConstants.ValidDocIdsConsensusMode.values()) {
+ assertEquals(
+ MinionTaskUtils.resolveGeneratorConsensusMode(mode,
MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY),
+ MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+ // STRICT keeps the configured mode.
+ assertEquals(
+ MinionTaskUtils.resolveGeneratorConsensusMode(mode,
MinionConstants.ValidDocIdsValidationMode.STRICT), mode);
+ }
+ }
+
+ @Test
+ public void testCrcMatches() {
+ assertTrue(MinionTaskUtils.crcMatches(1000, 5000, 1000, 9999));
+ assertTrue(MinionTaskUtils.crcMatches(1000, 5000, 2000, 5000));
+ assertFalse(MinionTaskUtils.crcMatches(1000, 5000, 2000, 9999));
+ assertFalse(MinionTaskUtils.crcMatches(1000, 5000, 2000, -1));
+ assertFalse(MinionTaskUtils.crcMatches(1000, -1, 2000, 5000));
+ assertFalse(MinionTaskUtils.crcMatches(1000, -1, 2000, -1));
+ }
+
+ @Test
+ public void testExecutorDataCrcFallbackMatch() {
+ List<Object> responses = List.of(makeResponse("seg1", "2000", "5000",
"server1", makeBitmap(4)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader("myTable_REALTIME", "seg1",
"1000",
+ "5000", "UNSAFE", responses, new String[]{"server1"}, this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 4);
+ }
+
+ @Test
+ public void testExecutorDataCrcMismatchSkips() {
+ List<Object> responses = List.of(makeResponse("seg1", "2000", "9999",
"server1", makeBitmap(4)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader("myTable_REALTIME", "seg1",
"1000",
+ "5000", "UNSAFE", responses, new String[]{"server1"}, this);
+ assertNull(result);
+ }
+
private static RoaringBitmap makeBitmap(int numDocs) {
RoaringBitmap b = new RoaringBitmap();
for (int i = 0; i < numDocs; i++) {
@@ -349,7 +409,13 @@ public class MinionTaskUtilsTest {
*/
private static ValidDocIdsBitmapResponse makeResponse(String segmentName,
String crc, String instanceId,
RoaringBitmap bitmap) {
- return new ValidDocIdsBitmapResponse(segmentName, crc,
ValidDocIdsType.SNAPSHOT,
+ return new ValidDocIdsBitmapResponse(segmentName, crc, null,
ValidDocIdsType.SNAPSHOT,
+ RoaringBitmapUtils.serialize(bitmap), instanceId,
ServiceStatus.Status.GOOD);
+ }
+
+ private static ValidDocIdsBitmapResponse makeResponse(String segmentName,
String crc, String dataCrc,
+ String instanceId, RoaringBitmap bitmap) {
+ return new ValidDocIdsBitmapResponse(segmentName, crc, dataCrc,
ValidDocIdsType.SNAPSHOT,
RoaringBitmapUtils.serialize(bitmap), instanceId,
ServiceStatus.Status.GOOD);
}
@@ -395,6 +461,13 @@ public class MinionTaskUtilsTest {
private static RoaringBitmap
getValidDocIdFromServerMatchingCrcWithMockedReader(String tableName,
String segmentName, String expectedCrc, String consensusMode,
List<Object> responseOrThrowByCallOrder,
String[] servers, MinionTaskUtilsTest testInstance) {
+ return getValidDocIdFromServerMatchingCrcWithMockedReader(tableName,
segmentName, expectedCrc, null, consensusMode,
+ responseOrThrowByCallOrder, servers, testInstance);
+ }
+
+ private static RoaringBitmap
getValidDocIdFromServerMatchingCrcWithMockedReader(String tableName,
+ String segmentName, String expectedCrc, String expectedDataCrc, String
consensusMode,
+ List<Object> responseOrThrowByCallOrder, String[] servers,
MinionTaskUtilsTest testInstance) {
testInstance.setupMinionContextWithServers(tableName, segmentName,
servers);
// Shared across all mock instances (production creates one reader per
server).
AtomicInteger callIndex = new AtomicInteger(0);
@@ -414,7 +487,7 @@ public class MinionTaskUtilsTest {
});
})) {
return MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableName,
segmentName,
- ValidDocIdsType.SNAPSHOT.name(), MinionContext.getInstance(),
expectedCrc, consensusMode);
+ ValidDocIdsType.SNAPSHOT.name(), MinionContext.getInstance(),
expectedCrc, expectedDataCrc, consensusMode);
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 127f96a162f..33f8e4e3d7e 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
@@ -232,13 +234,13 @@ public class UpsertCompactionTaskGeneratorTest {
// no completed segments scenario, there shouldn't be any segment selected
for compaction
UpsertCompactionTaskGenerator.SegmentSelectionResult
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new
HashMap<>(),
- validDocIdsMetadataInfo);
+ validDocIdsMetadataInfo, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
// test with valid crc and thresholds
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
- validDocIdsMetadataInfo);
+ validDocIdsMetadataInfo, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -249,7 +251,7 @@ public class UpsertCompactionTaskGeneratorTest {
compactionConfigs = getCompactionConfigs("60", "10");
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
- validDocIdsMetadataInfo);
+ validDocIdsMetadataInfo, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
_completedSegment2.getSegmentName());
@@ -258,7 +260,7 @@ public class UpsertCompactionTaskGeneratorTest {
compactionConfigs = getCompactionConfigs("0", "10");
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
- validDocIdsMetadataInfo);
+ validDocIdsMetadataInfo, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -269,7 +271,7 @@ public class UpsertCompactionTaskGeneratorTest {
compactionConfigs = getCompactionConfigs("30", "0");
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
- validDocIdsMetadataInfo);
+ validDocIdsMetadataInfo, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -288,7 +290,7 @@ public class UpsertCompactionTaskGeneratorTest {
});
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
- validDocIdsMetadataInfo);
+ validDocIdsMetadataInfo, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
// completedSegment is supposed to be filtered out
Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(),
0);
@@ -313,7 +315,7 @@ public class UpsertCompactionTaskGeneratorTest {
compactionConfigs = getCompactionConfigs("30", "0");
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
- validDocIdsMetadataInfo);
+ validDocIdsMetadataInfo, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(),
2);
Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 0);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
@@ -326,6 +328,99 @@ public class UpsertCompactionTaskGeneratorTest {
assertEquals(validDocIdsMetadataInfo.get("testTable__1").get(0).getSegmentCreationTimeMillis(),
9876543210L);
}
+ @Test
+ public void testProcessValidDocIdsMetadataConsensus() {
+ Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
+ String segmentName = _completedSegment.getSegmentName();
+ long crc = _completedSegment.getCrc();
+ Map<String, Integer> twoReplicas = Map.of(segmentName, 2);
+
+ Map<String, List<ValidDocIdsMetadataInfo>> equalReplicas =
Map.of(segmentName, List.of(
+ meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD,
"server2")));
+ UpsertCompactionTaskGenerator.SegmentSelectionResult result =
+
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ equalReplicas, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertEquals(result.getSegmentsForCompaction().size(), 1);
+
+ Map<String, List<ValidDocIdsMetadataInfo>> unequalReplicas =
Map.of(segmentName, List.of(
+ meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 60, 40, 100, crc, ServiceStatus.Status.GOOD,
"server2")));
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ unequalReplicas, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertTrue(result.getSegmentsForCompaction().isEmpty());
+ assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+ Map<String, List<ValidDocIdsMetadataInfo>> oneResponded =
Map.of(segmentName, List.of(
+ meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD,
"server1")));
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ oneResponded, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+ Map<String, List<ValidDocIdsMetadataInfo>> crcMismatch =
Map.of(segmentName, List.of(
+ meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 50, 50, 100, crc + 1, ServiceStatus.Status.GOOD,
"server2")));
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ crcMismatch, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+ Map<String, List<ValidDocIdsMetadataInfo>> unhealthy = Map.of(segmentName,
List.of(
+ meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.STARTING,
"server2")));
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ unhealthy, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ crcMismatch, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+ assertEquals(result.getSegmentsForCompaction().size(), 1);
+
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ crcMismatch, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+ assertTrue(result.getSegmentsForCompaction().isEmpty());
+
+ Map<String, List<ValidDocIdsMetadataInfo>> mostValidDocs =
Map.of(segmentName, List.of(
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD,
"server2")));
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
+ mostValidDocs, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+ assertTrue(result.getSegmentsForCompaction().isEmpty());
+ assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+ SegmentZKMetadata dataCrcSegment = new SegmentZKMetadata(segmentName);
+ dataCrcSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ dataCrcSegment.setTotalDocs(100L);
+ dataCrcSegment.setCrc(1000);
+ dataCrcSegment.setUseDataCrc(true);
+ dataCrcSegment.setDataCrc(5000);
+ Map<String, SegmentZKMetadata> dataCrcMap = Map.of(segmentName,
dataCrcSegment);
+ Map<String, List<ValidDocIdsMetadataInfo>> dataCrcMatch =
Map.of(segmentName, List.of(
+ metaWithDataCrc(segmentName, 50, 50, 100, 2000, "5000",
ServiceStatus.Status.GOOD, "server1"),
+ metaWithDataCrc(segmentName, 50, 50, 100, 2000, "5000",
ServiceStatus.Status.GOOD, "server2")));
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
dataCrcMap, dataCrcMatch,
+ twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertEquals(result.getSegmentsForCompaction().size(), 1);
+
+ Map<String, List<ValidDocIdsMetadataInfo>> dataCrcMismatch =
Map.of(segmentName, List.of(
+ metaWithDataCrc(segmentName, 50, 50, 100, 2000, "9999",
ServiceStatus.Status.GOOD, "server1"),
+ metaWithDataCrc(segmentName, 50, 50, 100, 2000, "9999",
ServiceStatus.Status.GOOD, "server2")));
+ result =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
dataCrcMap, dataCrcMismatch,
+ twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertTrue(result.getSegmentsForCompaction().isEmpty());
+ }
+
+ private static ValidDocIdsMetadataInfo meta(String segmentName, long
validDocs, long invalidDocs, long totalDocs,
+ long crc, ServiceStatus.Status serverStatus, String instanceId) {
+ return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs,
totalDocs, String.valueOf(crc), null,
+ ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(),
instanceId, serverStatus);
+ }
+
+ private static ValidDocIdsMetadataInfo metaWithDataCrc(String segmentName,
long validDocs, long invalidDocs,
+ long totalDocs, long crc, String dataCrc, ServiceStatus.Status
serverStatus, String instanceId) {
+ return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs,
totalDocs, String.valueOf(crc), dataCrc,
+ ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(),
instanceId, serverStatus);
+ }
+
@Test
public void testUpsertCompactionTaskConfig() {
Map<String, String> upsertCompactionTaskConfig =
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index 1bc8ad96876..60104b47286 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -363,11 +363,11 @@ public class UpsertCompactMergeTaskGeneratorTest {
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadata = new
HashMap<>();
validDocIdsMetadata.put("testTable__0__0__12345", Arrays.asList(
- new ValidDocIdsMetadataInfo("testTable__0__0__12345", 90, 10, 100,
"1000",
+ new ValidDocIdsMetadataInfo("testTable__0__0__12345", 90, 10, 100,
"1000", null,
ValidDocIdsType.SNAPSHOT, 100000, System.currentTimeMillis(),
"server1",
ServiceStatus.Status.GOOD)));
validDocIdsMetadata.put("testTable__0__1__12346", Arrays.asList(
- new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10, "2000",
+ new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10,
"2000", null,
ValidDocIdsType.SNAPSHOT, 10000, System.currentTimeMillis(),
"server1",
ServiceStatus.Status.GOOD)));
@@ -375,7 +375,7 @@ public class UpsertCompactMergeTaskGeneratorTest {
SegmentSelectionResult result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
- validDocIdsMetadata, alreadyMergedSegments, null);
+ validDocIdsMetadata, alreadyMergedSegments, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE, null);
Assert.assertNotNull(result);
Assert.assertNotNull(result.getSegmentsForCompactMergeByPartition());
@@ -401,11 +401,11 @@ public class UpsertCompactMergeTaskGeneratorTest {
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadata = new
HashMap<>();
// Segment with 0 valid docs - should be marked for deletion
validDocIdsMetadata.put("testTable__0__0__12345", Arrays.asList(
- new ValidDocIdsMetadataInfo("testTable__0__0__12345", 0, 100, 100,
"1000",
+ new ValidDocIdsMetadataInfo("testTable__0__0__12345", 0, 100, 100,
"1000", null,
ValidDocIdsType.SNAPSHOT, 100000, System.currentTimeMillis(),
"server1",
ServiceStatus.Status.GOOD)));
validDocIdsMetadata.put("testTable__0__1__12346", Arrays.asList(
- new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10, "2000",
+ new ValidDocIdsMetadataInfo("testTable__0__1__12346", 8, 2, 10,
"2000", null,
ValidDocIdsType.SNAPSHOT, 10000, System.currentTimeMillis(),
"server1",
ServiceStatus.Status.GOOD)));
@@ -413,13 +413,83 @@ public class UpsertCompactMergeTaskGeneratorTest {
SegmentSelectionResult result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
- validDocIdsMetadata, alreadyMergedSegments, null);
+ validDocIdsMetadata, alreadyMergedSegments, Map.of(),
MinionConstants.ValidDocIdsConsensusMode.UNSAFE, null);
Assert.assertNotNull(result);
Assert.assertEquals(result.getSegmentsForDeletion().size(), 1, "Should
have one segment for deletion");
Assert.assertTrue(result.getSegmentsForDeletion().contains("testTable__0__0__12345"));
}
+ /**
+ * Tests that replica consensus is enforced before a segment is selected.
Uses the deletion path (a fully-invalid
+ * segment) as a clean signal: the segment is processed only when its
replicas pass the consensus check.
+ */
+ @Test
+ public void testProcessValidDocIdsMetadataConsensus() {
+ Map<String, String> taskConfigs = new HashMap<>();
+ String segmentName = _completedSegment.getSegmentName();
+ long crc = _completedSegment.getCrc();
+ Map<String, SegmentZKMetadata> candidateSegmentsMap = Map.of(segmentName,
_completedSegment);
+ Set<String> noMerged = Set.of();
+ Map<String, Integer> twoReplicas = Map.of(segmentName, 2);
+
+ Map<String, List<ValidDocIdsMetadataInfo>> agree = Map.of(segmentName,
List.of(
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server2")));
+ SegmentSelectionResult result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
+ taskConfigs, candidateSegmentsMap, agree, noMerged, twoReplicas,
+ MinionConstants.ValidDocIdsConsensusMode.EQUAL, null);
+ Assert.assertTrue(result.getSegmentsForDeletion().contains(segmentName));
+
+ Map<String, List<ValidDocIdsMetadataInfo>> disagree = Map.of(segmentName,
List.of(
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 1, 99, 100, crc, ServiceStatus.Status.GOOD,
"server2")));
+ result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
taskConfigs,
+ candidateSegmentsMap, disagree, noMerged, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL, null);
+ Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+ Map<String, List<ValidDocIdsMetadataInfo>> crcMismatch =
Map.of(segmentName, List.of(
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 0, 100, 100, crc + 1, ServiceStatus.Status.GOOD,
"server2")));
+ result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
taskConfigs,
+ candidateSegmentsMap, crcMismatch, noMerged, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL,
+ null);
+ Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+ Map<String, List<ValidDocIdsMetadataInfo>> unhealthy = Map.of(segmentName,
List.of(
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.STARTING,
"server2")));
+ result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
taskConfigs,
+ candidateSegmentsMap, unhealthy, noMerged, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL, null);
+ Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+ result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
taskConfigs,
+ candidateSegmentsMap, crcMismatch, noMerged, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.UNSAFE,
+ null);
+ Assert.assertTrue(result.getSegmentsForDeletion().contains(segmentName));
+
+ Map<String, List<ValidDocIdsMetadataInfo>> mostValidDocs =
Map.of(segmentName, List.of(
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server1"),
+ meta(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD,
"server2")));
+ result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
taskConfigs,
+ candidateSegmentsMap, mostValidDocs, noMerged, twoReplicas,
+ MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS, null);
+ Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+
+ Map<String, List<ValidDocIdsMetadataInfo>> oneResponded =
Map.of(segmentName, List.of(
+ meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD,
"server1")));
+ result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME,
taskConfigs,
+ candidateSegmentsMap, oneResponded, noMerged, twoReplicas,
MinionConstants.ValidDocIdsConsensusMode.EQUAL,
+ null);
+ Assert.assertTrue(result.getSegmentsForDeletion().isEmpty());
+ }
+
+ private static ValidDocIdsMetadataInfo meta(String segmentName, long
validDocs, long invalidDocs, long totalDocs,
+ long crc, ServiceStatus.Status serverStatus, String instanceId) {
+ return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs,
totalDocs, String.valueOf(crc), null,
+ ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(),
instanceId, serverStatus);
+ }
+
/**
* Tests getCandidateSegments with various edge cases.
*/
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 013e4a3588a..5f139668ea8 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -573,8 +573,8 @@ public class TablesResource {
}
byte[] validDocIdsBytes =
RoaringBitmapUtils.serialize(validDocIdSnapshot);
return new ValidDocIdsBitmapResponse(segmentName,
indexSegment.getSegmentMetadata().getCrc(),
- finalValidDocIdsType, validDocIdsBytes,
_serverInstance.getInstanceDataManager().getInstanceId(),
- status);
+ toReportableDataCrc(indexSegment.getSegmentMetadata().getDataCrc()),
finalValidDocIdsType, validDocIdsBytes,
+ _serverInstance.getInstanceDataManager().getInstanceId(), status);
} finally {
tableDataManager.releaseSegment(segmentDataManager);
}
@@ -747,6 +747,10 @@ public class TablesResource {
validDocIdsMetadata.put("totalValidDocs", totalValidDocs);
validDocIdsMetadata.put("totalInvalidDocs", totalInvalidDocs);
validDocIdsMetadata.put("segmentCrc",
indexSegment.getSegmentMetadata().getCrc());
+ String reportableDataCrc =
toReportableDataCrc(indexSegment.getSegmentMetadata().getDataCrc());
+ if (reportableDataCrc != null) {
+ validDocIdsMetadata.put("segmentDataCrc", reportableDataCrc);
+ }
validDocIdsMetadata.put("validDocIdsType", finalValidDocIdsType);
validDocIdsMetadata.put("serverStatus", status);
validDocIdsMetadata.put("instanceId",
_serverInstance.getInstanceDataManager().getInstanceId());
@@ -774,6 +778,12 @@ public class TablesResource {
}
}
+ /// The segment's data CRC to report, or null when unavailable (negative).
+ @Nullable
+ private static String toReportableDataCrc(String dataCrc) {
+ return dataCrc != null && Long.parseLong(dataCrc) >= 0 ? dataCrc : null;
+ }
+
private Pair<ValidDocIdsType, MutableRoaringBitmap>
getValidDocIds(IndexSegment indexSegment,
String validDocIdsTypeStr) {
if (validDocIdsTypeStr == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]