chenboat commented on code in PR #13790:
URL: https://github.com/apache/pinot/pull/13790#discussion_r1874178792
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -791,15 +780,48 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig)
}
}
+ @VisibleForTesting
+ Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState
idealState) {
+ Set<Integer> partitionIds = new HashSet<>();
+ boolean allPartitionIdsFetched = true;
+ for (int i = 0; i < streamConfigs.size(); i++) {
+ final int index = i;
+ try {
+ partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
+ .map(partitionId ->
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId,
index))
+ .collect(Collectors.toSet()));
+ } catch (Exception e) {
+ allPartitionIdsFetched = false;
+ LOGGER.warn("Failed to fetch partition ids for stream: {}",
streamConfigs.get(i).getTopicName(), e);
+ }
+ }
+
+ // Some fetches failed, so ensure we do not miss any partition ids
Review Comment:
Please explain in high level how to deal with partial fetch failures. From
the code, it looks like we reuse the existing partition and then fetch only new
partitions. But it is better to explain clearly what is done here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]