GSharayu commented on code in PR #9309: URL: https://github.com/apache/pinot/pull/9309#discussion_r969860379
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java: ########## @@ -388,4 +395,89 @@ public Map<String, Map<String, String>> getNonTierSegmentAssignment() { return _nonTierSegmentAssignment; } } + + /** + * Returns a partition id for offline table + */ + public static int getOfflineSegmentPartitionId(String segmentName, String offlineTableName, + HelixManager helixManager, @Nullable String partitionColumn) { + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), offlineTableName, segmentName); + Preconditions.checkState(segmentZKMetadata != null, + "Failed to find segment ZK metadata for segment: %s of table: %s", segmentName, offlineTableName); + return getPartitionId(segmentZKMetadata, offlineTableName, partitionColumn); + } + + private static int getPartitionId(SegmentZKMetadata segmentZKMetadata, + String offlineTableName, @Nullable String partitionColumn) { + String segmentName = segmentZKMetadata.getSegmentName(); + ColumnPartitionMetadata partitionMetadata = + segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(partitionColumn); + Preconditions.checkState(partitionMetadata != null, + "Segment ZK metadata for segment: %s of table: %s does not contain partition metadata for column: %s", + segmentName, offlineTableName, partitionColumn); + Set<Integer> partitions = partitionMetadata.getPartitions(); + Preconditions.checkState(partitions.size() == 1, + "Segment ZK metadata for segment: %s of table: %s contains multiple partitions for column: %s", segmentName, + offlineTableName, partitionColumn); + return partitions.iterator().next(); + } + + /** + * Returns map of instance partition id to segments for offline tables + */ + public static Map<Integer, List<String>> getOfflineInstancePartitionIdToSegmentsMap(Set<String> segments, + int numInstancePartitions, String offlineTableName, + HelixManager helixManager, @Nullable String partitionColumn) { + // Fetch partition id from segment ZK metadata + List<SegmentZKMetadata> segmentsZKMetadata = + ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(), offlineTableName); + + Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>(); + Set<String> segmentsWithoutZKMetadata = new HashSet<>(segments); + for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { + String segmentName = segmentZKMetadata.getSegmentName(); + if (segmentsWithoutZKMetadata.remove(segmentName)) { + int partitionId = getPartitionId(segmentZKMetadata, offlineTableName, partitionColumn); + int instancePartitionId = partitionId % numInstancePartitions; + instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName); + } + } + Preconditions.checkState(segmentsWithoutZKMetadata.isEmpty(), "Failed to find ZK metadata for segments: %s", + segmentsWithoutZKMetadata); + + return instancePartitionIdToSegmentsMap; + } + + /** + * Returns a partition id for realtime table + */ + public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, + HelixManager helixManager, @Nullable String partitionColumn) { + Integer segmentPartitionId = + SegmentUtils.getRealtimeSegmentPartitionId(segmentName, realtimeTableName, helixManager, partitionColumn); + if (segmentPartitionId == null) { + // This case is for the uploaded segments for which there's no partition information. + // A random, but consistent, partition id is calculated based on the hash code of the segment name. + // Note that '% 10K' is used to prevent having partition ids with large value which will be problematic later in + // instance assignment formula. + segmentPartitionId = Math.abs(segmentName.hashCode() % 10_000); Review Comment: I will scope out comment from this PR as the change was discussed in https://github.com/apache/pinot/pull/8584 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org