This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b206ccd076 Log IS update failures in ensureAllPartitionsConsuming
(#15673)
b206ccd076 is described below
commit b206ccd076a5590c54dc607ee4b1286b0045c2b1
Author: NOOB <[email protected]>
AuthorDate: Wed May 7 05:23:54 2025 +0530
Log IS update failures in ensureAllPartitionsConsuming (#15673)
---
.../pinot/common/metrics/ControllerMeter.java | 4 +-
.../common/utils/helix/IdealStateGroupCommit.java | 3 +-
.../helix/core/PinotTableIdealStateBuilder.java | 8 ++-
.../realtime/PinotLLCRealtimeSegmentManager.java | 58 ++++++++++++----------
4 files changed, 43 insertions(+), 30 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 5f934561ad..45ae4c76db 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -73,7 +73,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
// Total Bytes read from deep store
DEEP_STORE_READ_BYTES_COMPLETED("deepStoreReadBytesCompleted", true),
// Total Bytes written to deep store
- DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true);
+ DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true),
+ // Tracks failures encountered while fetching partition group metadata
+ PARTITION_GROUP_METADATA_FETCH_ERROR("failures", true);
private final String _brokerMeterName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
index f7e7981a1a..0b55fd9faf 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
@@ -105,7 +105,8 @@ public class IdealStateGroupCommit {
* @param helixManager helixManager with the ability to pull from the
current data\
* @param resourceName the resource name to be updated
* @param updater the idealState updater to be applied
- * @return IdealState if the update is successful, null if not
+ * @return IdealState if the update is successful, exception if the update
fails and null if interrupted while
+ * committing change
*/
public IdealState commit(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater,
RetryPolicy retryPolicy, boolean noChangeOk) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 244f7853d8..8bc9ea442f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -21,6 +21,8 @@ package org.apache.pinot.controller.helix.core;
import java.util.List;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
@@ -96,9 +98,13 @@ public class PinotTableIdealStateBuilder {
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
} catch (Exception e) {
Exception fetcherException =
partitionGroupMetadataFetcher.getException();
+ String tableNameWithType = streamConfigs.get(0).getTableNameWithType();
LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of
table: {}",
streamConfigs.stream().map(streamConfig ->
streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
- streamConfigs.get(0).getTableNameWithType(), fetcherException);
+ tableNameWithType, fetcherException);
+ ControllerMetrics controllerMetrics = ControllerMetrics.get();
+ controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR,
+ 1L);
throw new RuntimeException(fetcherException);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 765b25852a..d2445f3234 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1208,33 +1208,37 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
- HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState
-> {
- assert idealState != null;
- boolean isTableEnabled = idealState.isEnabled();
- boolean isTablePaused = isTablePaused(idealState);
- boolean offsetsHaveToChange = offsetCriteria != null;
- if (isTableEnabled && !isTablePaused) {
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
- offsetsHaveToChange
- ? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
- : getPartitionGroupConsumptionStatusList(idealState,
streamConfigs);
- // FIXME: Right now, we assume topics are sharing same offset criteria
- OffsetCriteria originalOffsetCriteria =
streamConfigs.get(0).getOffsetCriteria();
- // Read the smallest offset when a new partition is detected
- streamConfigs.stream()
- .forEach(streamConfig -> streamConfig.setOffsetCriteria(
- offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
- streamConfigs.stream().forEach(streamConfig ->
streamConfig.setOffsetCriteria(originalOffsetCriteria));
- return ensureAllPartitionsConsuming(tableConfig, streamConfigs,
idealState, newPartitionGroupMetadataList,
- offsetCriteria);
- } else {
- LOGGER.info("Skipping LLC segments validation for table: {},
isTableEnabled: {}, isTablePaused: {}",
- realtimeTableName, isTableEnabled, isTablePaused);
- return idealState;
- }
- }, DEFAULT_RETRY_POLICY, true);
+ try {
+ HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
+ assert idealState != null;
+ boolean isTableEnabled = idealState.isEnabled();
+ boolean isTablePaused = isTablePaused(idealState);
+ boolean offsetsHaveToChange = offsetCriteria != null;
+ if (isTableEnabled && !isTablePaused) {
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ offsetsHaveToChange
+ ? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
+ : getPartitionGroupConsumptionStatusList(idealState,
streamConfigs);
+ // FIXME: Right now, we assume topics are sharing same offset
criteria
+ OffsetCriteria originalOffsetCriteria =
streamConfigs.get(0).getOffsetCriteria();
+ // Read the smallest offset when a new partition is detected
+ streamConfigs.stream()
+ .forEach(streamConfig -> streamConfig.setOffsetCriteria(
+ offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
+ streamConfigs.stream().forEach(streamConfig ->
streamConfig.setOffsetCriteria(originalOffsetCriteria));
+ return ensureAllPartitionsConsuming(tableConfig, streamConfigs,
idealState, newPartitionGroupMetadataList,
+ offsetCriteria);
+ } else {
+ LOGGER.info("Skipping LLC segments validation for table: {},
isTableEnabled: {}, isTablePaused: {}",
+ realtimeTableName, isTableEnabled, isTablePaused);
+ return idealState;
+ }
+ }, DEFAULT_RETRY_POLICY, true);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to update ideal state during
ensureAllPartitionsConsuming.", e);
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]