This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 91e929adf3 Allow RealtimeSegmentValidationManager to fix error
segments for partial upsert and dedup tables. (#15987)
91e929adf3 is described below
commit 91e929adf382dc3705aa4a89718f99cecdde5874
Author: 9aman <[email protected]>
AuthorDate: Sat Jun 7 04:01:57 2025 +0530
Allow RealtimeSegmentValidationManager to fix error segments for partial
upsert and dedup tables. (#15987)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 10 ++++++++--
.../validation/RealtimeSegmentValidationManager.java | 20 +++++++++++++++++++-
2 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a7b04e2766..012ef8d2d5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2434,7 +2434,8 @@ public class PinotLLCRealtimeSegmentManager {
* If segment is in ERROR state in only few replicas but has download URL,
we instead trigger a segment reset
* @param tableConfig The table config
*/
- public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig
tableConfig) {
+ public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig
tableConfig,
+ boolean repairErrorSegmentsForPartialUpsertOrDedup) {
String realtimeTableName = tableConfig.getTableName();
// Fetch ideal state and external view
IdealState idealState = getIdealState(realtimeTableName);
@@ -2516,7 +2517,7 @@ public class PinotLLCRealtimeSegmentManager {
boolean isPartialUpsertEnabled =
tableConfig.getUpsertConfig() != null &&
tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL;
boolean isDedupEnabled = tableConfig.getDedupConfig() != null &&
tableConfig.getDedupConfig().isDedupEnabled();
- if ((isPartialUpsertEnabled || isDedupEnabled)) {
+ if ((isPartialUpsertEnabled || isDedupEnabled) &&
!repairErrorSegmentsForPartialUpsertOrDedup) {
// We do not run reingestion for dedup and partial upsert tables in
pauseless as it can
// lead to data inconsistencies
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
@@ -2525,6 +2526,11 @@ public class PinotLLCRealtimeSegmentManager {
realtimeTableName);
return;
} else {
+ if ((isPartialUpsertEnabled || isDedupEnabled)) {
+ LOGGER.info(
+ "Repairing error segments in table: {} as
repairErrorSegmentForPartialUpsertOrDedup is set to true",
+ realtimeTableName);
+ }
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
segmentsInErrorStateInAllReplicas.size());
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 9fa236ecde..21d3d42802 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -67,6 +67,8 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
public static final String OFFSET_CRITERIA = "offsetCriteria";
public static final String RUN_SEGMENT_LEVEL_VALIDATION =
"runSegmentLevelValidation";
+ public static final String REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP
=
+ "repairErrorSegmentsForPartialUpsertOrDedup";
public RealtimeSegmentValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -96,6 +98,8 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
context._runSegmentLevelValidation = true;
_lastSegmentLevelValidationRunTimeMs = currentTimeMs;
}
+ context._repairErrorSegmentsForPartialUpsertOrDedup =
+
shouldRepairErrorSegmentsForPartialUpsertOrDedup(periodicTaskProperties);
String offsetCriteriaStr =
periodicTaskProperties.getProperty(OFFSET_CRITERIA);
if (offsetCriteriaStr != null) {
context._offsetCriteria = new
OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
@@ -129,7 +133,8 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
boolean isPauselessConsumptionEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
if (isPauselessConsumptionEnabled) {
// For pauseless tables without dedup or partial upsert, repair segments
in error state
-
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig);
+
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
+ context._repairErrorSegmentsForPartialUpsertOrDedup);
} else if (_segmentAutoResetOnErrorAtValidation) {
// Reset for pauseless tables is already handled in
repairSegmentsInErrorStateForPauselessConsumption method with
// additional checks for pauseless consumption
@@ -261,6 +266,18 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
return runValidation || timeThresholdMet;
}
+ private boolean shouldRepairErrorSegmentsForPartialUpsertOrDedup(Properties
periodicTaskProperties) {
+ return
Optional.ofNullable(periodicTaskProperties.getProperty(REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP))
+ .map(value -> {
+ try {
+ return Boolean.parseBoolean(value);
+ } catch (Exception e) {
+ return false;
+ }
+ })
+ .orElse(false);
+ }
+
@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
for (String tableNameWithType : tableNamesWithType) {
@@ -288,6 +305,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
public static final class Context {
private boolean _runSegmentLevelValidation;
+ private boolean _repairErrorSegmentsForPartialUpsertOrDedup;
private OffsetCriteria _offsetCriteria;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]