noob-se7en commented on code in PR #15673:
URL: https://github.com/apache/pinot/pull/15673#discussion_r2072699014
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1208,33 +1208,45 @@ public void ensureAllPartitionsConsuming(TableConfig
tableConfig, List<StreamCon
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
- HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState
-> {
- assert idealState != null;
- boolean isTableEnabled = idealState.isEnabled();
- boolean isTablePaused = isTablePaused(idealState);
- boolean offsetsHaveToChange = offsetCriteria != null;
- if (isTableEnabled && !isTablePaused) {
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
- offsetsHaveToChange
- ? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
- : getPartitionGroupConsumptionStatusList(idealState,
streamConfigs);
- // FIXME: Right now, we assume topics are sharing same offset criteria
- OffsetCriteria originalOffsetCriteria =
streamConfigs.get(0).getOffsetCriteria();
- // Read the smallest offset when a new partition is detected
- streamConfigs.stream()
- .forEach(streamConfig -> streamConfig.setOffsetCriteria(
- offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
- streamConfigs.stream().forEach(streamConfig ->
streamConfig.setOffsetCriteria(originalOffsetCriteria));
- return ensureAllPartitionsConsuming(tableConfig, streamConfigs,
idealState, newPartitionGroupMetadataList,
- offsetCriteria);
- } else {
- LOGGER.info("Skipping LLC segments validation for table: {},
isTableEnabled: {}, isTablePaused: {}",
- realtimeTableName, isTableEnabled, isTablePaused);
- return idealState;
- }
- }, DEFAULT_RETRY_POLICY, true);
+ try {
+ HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
+ assert idealState != null;
+ boolean isTableEnabled = idealState.isEnabled();
+ boolean isTablePaused = isTablePaused(idealState);
+ boolean offsetsHaveToChange = offsetCriteria != null;
+ if (isTableEnabled && !isTablePaused) {
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ offsetsHaveToChange
+ ? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
+ : getPartitionGroupConsumptionStatusList(idealState,
streamConfigs);
+ // FIXME: Right now, we assume topics are sharing same offset
criteria
+ OffsetCriteria originalOffsetCriteria =
streamConfigs.get(0).getOffsetCriteria();
+ // Read the smallest offset when a new partition is detected
+ streamConfigs.stream()
+ .forEach(streamConfig -> streamConfig.setOffsetCriteria(
+ offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList;
+ try {
+ newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
+ } catch (Exception e) {
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
+ ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR, 1L);
Review Comment:
Moved it to `PinotTableIdealStateBuilder.getPartitionGroupMetadataList`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]