This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5550c24f1e Disable reingestion for Pauseless dedup (#15383)
5550c24f1e is described below
commit 5550c24f1e92a40df904af27853f61b5a9098428
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Mar 28 12:45:46 2025 +0530
Disable reingestion for Pauseless dedup (#15383)
* Disable reingestion for Pauseless dedup
* Emit a metric that tracks how many errored segments were detected
* Add new metric for unrecoverable errors
* Add comment on metric
---------
Co-authored-by: KKCorps <[email protected]>
---
.../pinot/common/metrics/ControllerGauge.java | 10 +++++++-
.../realtime/PinotLLCRealtimeSegmentManager.java | 28 ++++++++++++++++++++--
.../RealtimeSegmentValidationManager.java | 3 ++-
3 files changed, 37 insertions(+), 4 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index cd6215228c..6418aca8d1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -192,7 +192,15 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
UNTRACKED_SEGMENTS_COUNT("untrackedSegmentsCount", false),
// Metric used to track errors during the periodic table retention management
- RETENTION_MANAGER_ERROR("retentionManagerError", false);
+ RETENTION_MANAGER_ERROR("retentionManagerError", false),
+
+ // Metric used to track when segments in error state are detected for
pauseless table
+ PAUSELESS_SEGMENTS_IN_ERROR_COUNT("pauselessSegmentsInErrorCount", false),
+
+ // Metric used to track when segments in error state are detected for
pauseless table for which needs
+ // manual intervention for repair
+
PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT("pauselessSegmentsInUnrecoverableErrorCount",
false);
+
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 84afaf6fdf..9e8bff5f5d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -108,6 +108,7 @@ import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -2359,9 +2360,10 @@ public class PinotLLCRealtimeSegmentManager {
* Request body (JSON):
*
* If segment is in ERROR state in only few replicas but has download URL,
we instead trigger a segment reset
- * @param realtimeTableName The table name with type, e.g. "myTable_REALTIME"
+ * @param tableConfig The table config
*/
- public void repairSegmentsInErrorStateForPauselessConsumption(String
realtimeTableName) {
+ public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig
tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
// Fetch ideal state and external view
IdealState idealState = getIdealState(realtimeTableName);
ExternalView externalView =
_helixResourceManager.getTableExternalView(realtimeTableName);
@@ -2425,7 +2427,12 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+
if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
return;
}
@@ -2434,6 +2441,23 @@ public class PinotLLCRealtimeSegmentManager {
segmentsInErrorStateInAtLeastOneReplica.size(),
segmentsInErrorStateInAtLeastOneReplica,
segmentsInErrorStateInAllReplicas.size(),
segmentsInErrorStateInAllReplicas, realtimeTableName);
+ boolean isPartialUpsertEnabled =
+ tableConfig.getUpsertConfig() != null &&
tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL;
+ boolean isDedupEnabled = tableConfig.getDedupConfig() != null &&
tableConfig.getDedupConfig().isDedupEnabled();
+ if ((isPartialUpsertEnabled || isDedupEnabled)) {
+ // We do not run reingestion for dedup and partial upsert tables in
pauseless as it can
+ // lead to data inconsistencies
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT,
segmentsInErrorStateInAllReplicas.size());
+ LOGGER.error("Skipping repair for errored segments in table: {} because
dedup or partial upsert is enabled.",
+ realtimeTableName);
+ return;
+ } else {
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
segmentsInErrorStateInAllReplicas.size());
+ }
+
+
for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName);
if (segmentZKMetadata == null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 9e9f6ba110..2fcb669335 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -127,7 +127,8 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
boolean isPauselessConsumptionEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
if (isPauselessConsumptionEnabled) {
-
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig.getTableName());
+ // For pauseless tables without dedup or partial upsert, repair segments
in error state
+
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig);
} else if (_segmentAutoResetOnErrorAtValidation) {
// Reset for pauseless tables is already handled in
repairSegmentsInErrorStateForPauselessConsumption method with
// additional checks for pauseless consumption
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]