This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2f4c086ef51 Avoid Inconsistencies among replicas during Upsert
Compaction Tasks (#17696)
2f4c086ef51 is described below
commit 2f4c086ef517d848c3746a522ad14d5bff85af10
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Fri Mar 6 13:12:37 2026 -0800
Avoid Inconsistencies among replicas during Upsert Compaction Tasks (#17696)
---
.../apache/pinot/core/common/MinionConstants.java | 15 ++
.../pinot/plugin/minion/tasks/MinionTaskUtils.java | 154 ++++++-----
.../UpsertCompactionTaskExecutor.java | 10 +-
.../UpsertCompactMergeTaskExecutor.java | 16 +-
.../UpsertCompactMergeTaskGenerator.java | 18 +-
.../plugin/minion/tasks/MinionTaskUtilsTest.java | 285 +++++++++++++++++++++
6 files changed, 405 insertions(+), 93 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 1f784fb368b..9e059e98417 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,6 +92,11 @@ public class MinionConstants {
*/
public static final String SEGMENT_DOWNLOAD_PARALLELISM =
"segmentDownloadParallelism";
+ /** Valid doc ids consensus mode (executor-only). Kept internal; executors
pass config string. */
+ public enum ValidDocIdsConsensusMode {
+ UNSAFE, EQUAL, MOST_VALID_DOCS
+ }
+
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
@@ -257,6 +262,16 @@ public class MinionConstants {
* number of segments to query in one batch to fetch valid doc id
metadata, by default 500
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST =
"numSegmentsBatchPerServerRequest";
+
+ /**
+ * Valid doc ids consensus mode used by the executor only (generator
unchanged). Values: UNSAFE, EQUAL,
+ * MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY;
EQUAL = require all replicas
+ * to have the same valid doc set (default); MOST_VALID_DOCS = use replica
with most valid docs.
+ */
+ public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY =
"validDocIdsConsensusMode";
+
+ /** Default: equal valid doc set consensus across replicas. */
+ public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";
}
public static class UpsertCompactMergeTask {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index ca197849210..396a91fc477 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -36,7 +36,6 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.auth.NullAuthProvider;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
-import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.ServiceStatus;
@@ -67,6 +66,14 @@ import org.slf4j.LoggerFactory;
public class MinionTaskUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(MinionTaskUtils.class);
+ /** Package-private for testing: parses validDocIdsComparisonMode config
string. */
+ static MinionConstants.ValidDocIdsConsensusMode
parseValidDocIdsConsensusMode(String value) {
+ if (value == null || value.isBlank()) {
+ return MinionConstants.ValidDocIdsConsensusMode.EQUAL;
+ }
+ return
MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim());
+ }
+
private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";
public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
@@ -281,22 +288,22 @@ public class MinionTaskUtils {
}
/**
- * Returns the validDocID bitmap from the server whose local segment crc
matches both crc of ZK metadata and
- * deepstore copy (expectedCrc).
+ * Returns the validDocIds bitmap from server(s). {@code comparisonMode} is
the task config value: UNSAFE,
+ * EQUAL (default), or MOST_VALID_DOCS.
*/
@Nullable
public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String
tableNameWithType, String segmentName,
- String validDocIdsType, MinionContext minionContext, String expectedCrc)
{
+ String validDocIdsType, MinionContext minionContext, String expectedCrc,
String comparisonModeStr) {
+ MinionConstants.ValidDocIdsConsensusMode consensusMode =
parseValidDocIdsConsensusMode(comparisonModeStr);
String clusterName = minionContext.getHelixManager().getClusterName();
HelixAdmin helixAdmin =
minionContext.getHelixManager().getClusterManagmentTool();
- RoaringBitmap validDocIds = null;
List<String> servers = getServers(segmentName, tableNameWithType,
helixAdmin, clusterName);
+ List<RoaringBitmap> matchingBitmaps = new ArrayList<>();
+
for (String server : servers) {
InstanceConfig instanceConfig =
helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
- // We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
- // passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
try {
@@ -304,43 +311,91 @@ public class MinionTaskUtils {
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
- LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: "
- + endpoint, e);
- continue;
+ if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+ LOGGER.warn(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint, e);
+ continue;
+ } else {
+ throw new IllegalStateException(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint, e);
+ }
}
+ String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
// Check crc from the downloaded segment against the crc returned from
the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the
race condition where the segment has been
// uploaded successfully while the server is still reloading the
segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all
primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks
for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't
need to check that here.
- String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
- // In this scenario, we are hitting the other replica of the segment
which did not commit to ZK or deepstore.
- // We will skip processing this bitmap to query other server to
confirm if there is a valid matching CRC.
- String message = "CRC mismatch for segment: " + segmentName + ",
expected value based on task generator: "
- + expectedCrc + ", actual crc from validDocIdsBitmapResponse from
endpoint " + endpoint + ": "
- + crcFromValidDocIdsBitmap;
- LOGGER.warn(message);
- continue;
+ if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+ LOGGER.warn("CRC mismatch for segment: {} from endpoint {},
skipping", segmentName, endpoint);
+ continue;
+ } else {
+ throw new IllegalStateException(
+ "CRC mismatch for segment: " + segmentName + ", expected: " +
expectedCrc + ", actual from endpoint "
+ + endpoint + ": " + crcFromValidDocIdsBitmap);
+ }
}
- // skipping servers which are not in READY state. The bitmaps would be
inconsistent when
- // server is NOT READY as UPDATING segments might be updating the ONLINE
segments
if (validDocIdsBitmapResponse.getServerStatus() != null &&
!validDocIdsBitmapResponse.getServerStatus()
.equals(ServiceStatus.Status.GOOD)) {
- String message = "Server " + validDocIdsBitmapResponse.getInstanceId()
+ " is in "
- + validDocIdsBitmapResponse.getServerStatus() + " state, skipping
it for execution for segment: "
- + validDocIdsBitmapResponse.getSegmentName() + ". Will try other
servers.";
- LOGGER.warn(message);
- continue;
+ if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+ LOGGER.warn("Server {} not READY for segment {}, skipping",
validDocIdsBitmapResponse.getInstanceId(),
+ segmentName);
+ continue;
+ } else {
+ throw new IllegalStateException("Server " +
validDocIdsBitmapResponse.getInstanceId() + " is in "
+ + validDocIdsBitmapResponse.getServerStatus() + " state for
segment: " + segmentName
+ + ". Failing task to avoid inconsistency among replicas.");
+ }
+ }
+
+ RoaringBitmap bitmap =
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
+ int cardinality = bitmap.getCardinality();
+
+ if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+ LOGGER.info("Using server {} with {} valid docs for segment {}
(mode=UNSAFE)", server, cardinality,
+ segmentName);
+ return bitmap;
}
- validDocIds =
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
- break;
+ matchingBitmaps.add(bitmap);
}
- return validDocIds;
+
+ if (matchingBitmaps.isEmpty()) {
+ return null;
+ }
+
+ if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
+ RoaringBitmap consensusBitMap = matchingBitmaps.get(0);
+ for (RoaringBitmap b : matchingBitmaps) {
+ if (!b.equals(consensusBitMap)) {
+ throw new IllegalStateException("No consensus on validDocs across
replicas for segment: " + segmentName
+ + ". Failing task to avoid replica inconsistency.");
+ }
+ }
+ LOGGER.info("All {} servers have {} valid docs for segment {}",
servers.size(), consensusBitMap.getCardinality(),
+ segmentName);
+ return consensusBitMap;
+ }
+
+ // MOST_VALID_DOCS: explicitly pick the bitmap with the maximum valid doc
count
+ RoaringBitmap maxCardinalityMap = null;
+ int maxCard = -1;
+ for (RoaringBitmap b : matchingBitmaps) {
+ int card = b.getCardinality();
+ if (card > maxCard) {
+ maxCard = card;
+ maxCardinalityMap = b;
+ }
+ }
+ if (maxCardinalityMap != null) {
+ LOGGER.info("Selected server with {} valid docs for segment {}
(mode=MOST_VALID_DOCS, checked {} servers)",
+ maxCard, segmentName, servers.size());
+ }
+ return maxCardinalityMap;
}
public static String toUTCString(long epochMillis) {
@@ -396,47 +451,4 @@ public class MinionTaskUtils {
}
return validDocIdsType;
}
-
- /**
- * Checks if all replicas have consensus on validDoc counts for a segment.
- * SAFETY LOGIC:
- * 1. Only proceed with operations when ALL replicas agree on totalValidDocs
count
- * 2. Skip operations if ANY server hosting the segment is not in READY state
- * 3. Include all replicas (even those with CRC mismatches) in consensus for
safety
- *
- * @param segmentName the name of the segment being checked
- * @param replicaMetadataList list of metadata from all replicas of the
segment
- * @return true if all replicas have consensus on validDoc counts, false
otherwise
- */
- public static boolean hasValidDocConsensus(String segmentName,
- List<ValidDocIdsMetadataInfo> replicaMetadataList) {
-
- if (replicaMetadataList == null || replicaMetadataList.isEmpty()) {
- LOGGER.warn("No replica metadata available for segment: {}",
segmentName);
- return false;
- }
-
- // Check server readiness and validDoc consensus
- Long consensusValidDocs = null;
- for (ValidDocIdsMetadataInfo metadata : replicaMetadataList) {
- // Check server readiness - skip if ANY server is not ready
- if (metadata.getServerStatus() != null &&
!metadata.getServerStatus().equals(ServiceStatus.Status.GOOD)) {
- LOGGER.warn("Server {} is in {} state for segment: {}, skipping
consensus check",
- metadata.getInstanceId(), metadata.getServerStatus(), segmentName);
- return false;
- }
-
- // Check if all replicas have the same totalValidDocs count
- long validDocs = metadata.getTotalValidDocs();
- if (consensusValidDocs == null) {
- // First iteration, we record the value to compare against
- consensusValidDocs = validDocs;
- } else if (!consensusValidDocs.equals(validDocs)) {
- LOGGER.warn("Inconsistent validDoc counts across replicas for segment:
{}. Expected: {}, but found: {}",
- segmentName, consensusValidDocs, validDocs);
- return false;
- }
- }
- return true;
- }
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index f4497b69ebd..189072f5bc9 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -75,9 +75,17 @@ public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExe
LOGGER.error(message);
throw new IllegalStateException(message);
}
+
+ // Executor-only: read comparison mode string from task config (no auth
resolution or URL hits).
+ Map<String, String> taskConfigs =
+ tableConfig.getTaskConfig() != null ?
tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
+ String consensusMode =
+ taskConfigs != null ?
taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+ UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
+ : UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType,
segmentName, validDocIdsTypeStr,
- MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
+ MINION_CONTEXT, originalSegmentCrcFromTaskGenerator,
consensusMode);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track
it via task-error metrics
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
index e5384ecca64..d148e9d26be 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
@@ -109,15 +109,23 @@ public class UpsertCompactMergeTaskExecutor extends
BaseMultipleSegmentsConversi
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
validateCRCForInputSegments(segmentMetadataList,
originalSegmentCrcFromTaskGenerator);
- // Fetch validDocID snapshot from server and get record-reader for
compacted reader.
+ // Executor-only: read comparison mode string from task config (no auth
resolution or URL hits).
+ Map<String, String> taskConfigs =
+ tableConfig.getTaskConfig() != null ?
tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
+ String consensusMode = taskConfigs != null ? taskConfigs.getOrDefault(
+ MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
+ :
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
+
List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType,
x.getName(),
- ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
+ ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc(),
consensusMode);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track
it via task-error metrics
- String message = String.format("No validDocIds found from all servers.
They either failed to download "
- + "or did not match crc from segment copy obtained from deepstore
/ servers. " + "Expected crc: %s", "");
+ String message = "No validDocIds found from all servers for segment: "
+ x.getName()
+ + ". They either failed to download or did not match crc from
segment copy obtained from "
+ + "deepstore/servers. Expected crc: " + x.getCrc();
LOGGER.error(message);
throw new IllegalStateException(message);
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 904f6c28c1b..3d72a7c6b8c 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -34,7 +34,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
@@ -45,7 +44,6 @@ import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtil
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -295,23 +293,9 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
continue;
}
SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
- List<ValidDocIdsMetadataInfo> replicaMetadataList =
validDocIdsMetadataInfoMap.get(segmentName);
-
- // Check consensus across all replicas before proceeding with any
operations
- if (!MinionTaskUtils.hasValidDocConsensus(segmentName,
replicaMetadataList)) {
- LOGGER.info("Skipping segment {} for table {} - no consensus on
validDoc counts across replicas",
- segmentName, tableNameWithType);
-
- // Emit metric to track segments skipped due to consensus failure
- if (controllerMetrics != null) {
- controllerMetrics.addMeteredTableValue(tableNameWithType,
-
ControllerMeter.UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE, 1L);
- }
- continue;
- }
// Process with existing logic using the first replica with matching CRC
(since all have consensus)
- for (ValidDocIdsMetadataInfo validDocIdsMetadata : replicaMetadataList) {
+ for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoMap.get(segmentName)) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index 0ad879896f3..709118ba341 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -21,11 +21,22 @@ package org.apache.pinot.plugin.minion.tasks;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -33,15 +44,24 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.MockedConstruction;
+import org.roaringbitmap.RoaringBitmap;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
@@ -275,6 +295,271 @@ public class MinionTaskUtilsTest {
"'snapshot' must not be 'DISABLE' with validDocIdsType:
SNAPSHOT_WITH_DELETE");
}
+ @Test
+ public void testParseValidDocIdsConsensusMode() {
+ // Null or blank defaults to EQUAL
+ assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode(null),
+ MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode(""),
+ MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode(" "),
+ MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+
+ // UNSAFE
+ assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("UNSAFE"),
+ MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+ assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("unsafe"),
+ MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+
+ // EQUAL
+ assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("EQUAL"),
+ MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+ assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode(" EQUAL "),
+ MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+
+ // MOST_VALID_DOCS
+
assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("MOST_VALID_DOCS"),
+ MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+
assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("most_valid_docs"),
+ MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+
+ // Invalid value throws
+ expectThrows(IllegalArgumentException.class,
+ () -> MinionTaskUtils.parseValidDocIdsConsensusMode("INVALID_MODE"));
+ }
+
+ /**
+ * Builds a RoaringBitmap with {@code numDocs} valid doc ids (0..numDocs-1).
+ */
+ private static RoaringBitmap makeBitmap(int numDocs) {
+ RoaringBitmap b = new RoaringBitmap();
+ for (int i = 0; i < numDocs; i++) {
+ b.add(i);
+ }
+ return b;
+ }
+
+ /**
+ * Builds a ValidDocIdsBitmapResponse for testing: same segmentCrc and GOOD
status.
+ */
+ private static ValidDocIdsBitmapResponse makeResponse(String segmentName,
String crc, String instanceId,
+ RoaringBitmap bitmap) {
+ return new ValidDocIdsBitmapResponse(segmentName, crc,
ValidDocIdsType.SNAPSHOT,
+ RoaringBitmapUtils.serialize(bitmap), instanceId,
ServiceStatus.Status.GOOD);
+ }
+
+ /**
+ * Creates an InstanceConfig so that InstanceUtils.getServerAdminEndpoint()
returns a valid URL.
+ */
+ private static InstanceConfig makeInstanceConfig(String instanceId) {
+ InstanceConfig config = new InstanceConfig(instanceId);
+ config.setHostName("localhost");
+ config.getRecord().setIntField(Helix.Instance.ADMIN_PORT_KEY, 8098);
+ return config;
+ }
+
+ /**
+ * Sets up MinionContext with mock Helix so getServers() returns the given
server list.
+ */
+ private void setupMinionContextWithServers(String tableNameWithType, String
segmentName, String[] servers) {
+ ExternalView externalView = new ExternalView(tableNameWithType);
+ Map<String, String> assignment = new HashMap<>();
+ for (String s : servers) {
+ assignment.put(s, SegmentStateModel.ONLINE);
+ }
+ externalView.getRecord().getMapFields().put(segmentName, assignment);
+
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
+ when(helixAdmin.getResourceExternalView(anyString(),
eq(tableNameWithType))).thenReturn(externalView);
+ for (String server : servers) {
+ when(helixAdmin.getInstanceConfig(anyString(),
eq(server))).thenReturn(makeInstanceConfig(server));
+ }
+
+ HelixManager helixManager = mock(HelixManager.class);
+ when(helixManager.getClusterName()).thenReturn("testCluster");
+ when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+
+ MinionContext.getInstance().setHelixManager(helixManager);
+ }
+
+ /**
+ * Calls getValidDocIdFromServerMatchingCrc with ServerSegmentMetadataReader
mocked. Each invocation of
+ * getValidDocIdsBitmapFromServer returns the next element of
responseOrThrowByCallOrder; if it is an Exception,
+ * that exception is thrown (simulating fetch failure).
+ */
+ private static RoaringBitmap
getValidDocIdFromServerMatchingCrcWithMockedReader(String tableName,
+ String segmentName, String expectedCrc, String consensusMode,
List<Object> responseOrThrowByCallOrder,
+ String[] servers, MinionTaskUtilsTest testInstance) {
+ testInstance.setupMinionContextWithServers(tableName, segmentName,
servers);
+ // Shared across all mock instances (production creates one reader per
server).
+ AtomicInteger callIndex = new AtomicInteger(0);
+ try (MockedConstruction<ServerSegmentMetadataReader> ignored =
mockConstruction(ServerSegmentMetadataReader.class,
+ (mock, context) -> {
+ when(mock.getValidDocIdsBitmapFromServer(anyString(), anyString(),
anyString(), anyString(), anyInt()))
+ .thenAnswer(inv -> {
+ int i = callIndex.getAndIncrement();
+ if (i >= responseOrThrowByCallOrder.size()) {
+ throw new IllegalStateException("Mock received more calls
than expected");
+ }
+ Object action = responseOrThrowByCallOrder.get(i);
+ if (action instanceof Exception) {
+ throw (Exception) action;
+ }
+ return (ValidDocIdsBitmapResponse) action;
+ });
+ })) {
+ return MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableName,
segmentName,
+ ValidDocIdsType.SNAPSHOT.name(), MinionContext.getInstance(),
expectedCrc, consensusMode);
+ }
+ }
+
+ @Test
+ public void testSameValidDocsEqualConsensus() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(5)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName,
expectedCrc,
+ "EQUAL", responses, new String[]{"server1", "server2", "server3"},
this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 5);
+ }
+
+ @Test
+ public void testSameValidDocsMaxValidDocs() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(5)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName,
expectedCrc,
+ "MOST_VALID_DOCS", responses, new String[]{"server1", "server2",
"server3"}, this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 5);
+ }
+
+ @Test
+ public void testSameValidDocsNone() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(5)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName,
expectedCrc,
+ "UNSAFE", responses, new String[]{"server1", "server2", "server3"},
this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 5);
+ }
+
+ @Test
+ public void testDifferentValidDocsMaxValidDocsMax() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(3)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(4)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName,
expectedCrc,
+ "MOST_VALID_DOCS", responses, new String[]{"server1", "server2",
"server3"}, this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 5);
+ }
+
+ @Test
+ public void testsomeServersNoValidDocsEqualConsensus() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(0)),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(0)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(3)));
+ expectThrows(IllegalStateException.class,
+ () -> getValidDocIdFromServerMatchingCrcWithMockedReader(tableName,
segmentName, expectedCrc,
+ "EQUAL", responses, new String[]{"server1", "server2", "server3"},
this));
+ }
+
+ @Test
+ public void testsomeServersNoValidDocsMaxValidDocs() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(0)),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(0)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(3)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName,
expectedCrc,
+ "MOST_VALID_DOCS", responses, new String[]{"server1", "server2",
"server3"}, this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 3);
+ }
+
+ @Test
+ public void testSomeServersNoValidDocsNone() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(0)),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(0)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(3)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName,
expectedCrc,
+ "UNSAFE", responses, new String[]{"server1", "server2", "server3"},
this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 0);
+ }
+
+ // --- one server fails (returns null): EQUAL throws; others skip and use
remaining ---
+
+ @Test
+ public void testOneServerFailsEqualConsensus() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+ new RuntimeException("simulated fetch failure"),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+ expectThrows(IllegalStateException.class,
+ () -> getValidDocIdFromServerMatchingCrcWithMockedReader(tableName,
segmentName, expectedCrc,
+ "EQUAL", responses, new String[]{"server1", "server2", "server3"},
this));
+ }
+
+ @Test
+ public void testOneServerFailsNone() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(
+ new RuntimeException("simulated fetch failure"),
+ makeResponse(segmentName, expectedCrc, "server2", makeBitmap(3)),
+ makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+ RoaringBitmap result =
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName,
expectedCrc,
+ "UNSAFE", responses, new String[]{"server1", "server2", "server3"},
this);
+ assertNotNull(result);
+ assertEquals(result.getCardinality(), 3);
+ }
+
+ @Test
+ public void testAllServersFailMostValidDocs() {
+ String tableName = "myTable_REALTIME";
+ String segmentName = "seg1";
+ String expectedCrc = "crc1";
+ List<Object> responses = List.of(new RuntimeException("simulated"), new
RuntimeException("simulated"),
+ new RuntimeException("simulated"));
+ expectThrows(IllegalStateException.class,
+ () -> getValidDocIdFromServerMatchingCrcWithMockedReader(tableName,
segmentName, expectedCrc, "MOST_VALID_DOCS",
+ responses, new String[]{"server1", "server2", "server3"}, this));
+ }
+
@Test
public void testGetPushTaskConfigNoConfig() {
Map<String, String> taskConfig = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]