Jackie-Jiang commented on code in PR #17706:
URL: https://github.com/apache/pinot/pull/17706#discussion_r2819085356
##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java:
##########
@@ -217,8 +217,8 @@ boolean shouldEnsureConsuming(String tableNameWithType) {
private void runSegmentLevelValidation(TableConfig tableConfig) {
String realtimeTableName = tableConfig.getTableName();
-
- List<SegmentZKMetadata> segmentsZKMetadata =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+ List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+ _pinotHelixResourceManager.forEachSegmentsZKMetadata(realtimeTableName,
segmentsZKMetadata::add);
Review Comment:
This is not reducing memory footprint
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -161,32 +162,58 @@ public int getMaxAttemptsPerTask(String minionTag) {
* @return the list of segment zk metadata for available segments in the
table.
*/
public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String
tableNameWithType) {
+ return getSegmentsZKMetadataInIdealState(tableNameWithType, null);
+ }
+
+ public List<SegmentZKMetadata>
getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
IdealState idealState =
_clusterInfoAccessor.getIdealState(tableNameWithType);
+ if (idealState == null) {
+ return new ArrayList<>();
+ }
+ return getSegmentsZKMetadataInIdealState(tableNameWithType,
segmentZKMetadata -> {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ Map<String, String> instanceStateMap =
idealState.getInstanceStateMap(segmentName);
+ return segmentZKMetadata.getStatus().isCompleted() // skip consuming
segments
+ && instanceStateMap != null &&
!instanceStateMap.containsValue(SegmentStateModel.CONSUMING);
+ // The last check is for an edge case where
+ // 1. SegmentZKMetadata was updated to DONE in segment commit
protocol, but
+ // 2. IdealState for the segment was not updated to ONLINE due to some
issue in the controller.
+ // We avoid picking up such segments to allow
RealtimeSegmentValidationManager to fix them.
+ });
+ }
+
+ private List<SegmentZKMetadata> getSegmentsZKMetadataInIdealState(String
tableNameWithType,
+ Predicate<SegmentZKMetadata> segmentMetadataFilter) {
+ IdealState idealState =
_clusterInfoAccessor.getIdealState(tableNameWithType);
+ if (idealState == null) {
+ return new ArrayList<>();
+ }
Set<String> segmentsForTable = idealState.getPartitionSet();
- List<SegmentZKMetadata> segmentZKMetadataList =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+ if (segmentsForTable == null || segmentsForTable.isEmpty()) {
+ return new ArrayList<>();
+ }
+
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
- for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
- if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
+ boolean[] callbackInvoked = new boolean[1];
Review Comment:
Why do we need to track this?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -161,32 +162,58 @@ public int getMaxAttemptsPerTask(String minionTag) {
* @return the list of segment zk metadata for available segments in the
table.
*/
public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String
tableNameWithType) {
+ return getSegmentsZKMetadataInIdealState(tableNameWithType, null);
+ }
+
+ public List<SegmentZKMetadata>
getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
IdealState idealState =
_clusterInfoAccessor.getIdealState(tableNameWithType);
+ if (idealState == null) {
+ return new ArrayList<>();
+ }
+ return getSegmentsZKMetadataInIdealState(tableNameWithType,
segmentZKMetadata -> {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ Map<String, String> instanceStateMap =
idealState.getInstanceStateMap(segmentName);
+ return segmentZKMetadata.getStatus().isCompleted() // skip consuming
segments
+ && instanceStateMap != null &&
!instanceStateMap.containsValue(SegmentStateModel.CONSUMING);
+ // The last check is for an edge case where
+ // 1. SegmentZKMetadata was updated to DONE in segment commit
protocol, but
+ // 2. IdealState for the segment was not updated to ONLINE due to some
issue in the controller.
+ // We avoid picking up such segments to allow
RealtimeSegmentValidationManager to fix them.
+ });
+ }
+
+ private List<SegmentZKMetadata> getSegmentsZKMetadataInIdealState(String
tableNameWithType,
+ Predicate<SegmentZKMetadata> segmentMetadataFilter) {
+ IdealState idealState =
_clusterInfoAccessor.getIdealState(tableNameWithType);
+ if (idealState == null) {
+ return new ArrayList<>();
+ }
Set<String> segmentsForTable = idealState.getPartitionSet();
- List<SegmentZKMetadata> segmentZKMetadataList =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+ if (segmentsForTable == null || segmentsForTable.isEmpty()) {
+ return new ArrayList<>();
+ }
+
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
- for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
- if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
+ boolean[] callbackInvoked = new boolean[1];
+ _clusterInfoAccessor.forEachSegmentsZKMetadata(tableNameWithType,
segmentZKMetadata -> {
+ callbackInvoked[0] = true;
+ if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())
+ && (segmentMetadataFilter == null ||
segmentMetadataFilter.test(segmentZKMetadata))) {
selectedSegmentZKMetadataList.add(segmentZKMetadata);
Review Comment:
This is not reducing memory footprint
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]