reswqa commented on code in PR #24900: URL: https://github.com/apache/flink/pull/24900#discussion_r1634238153
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ########## @@ -148,12 +162,52 @@ public void close() throws IOException { // -------------------------------------------------------------------------------------------- private List<TierConsumerAgent> createTierConsumerAgents( - List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) { + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<List<TierShuffleDescriptor>> shuffleDescriptors) { ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>(); + + List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors = + transformTierShuffleDescriptors(shuffleDescriptors); + // Each tier only requires one inner list of transformedTierShuffleDescriptors, so the size + // of transformedTierShuffleDescriptors and the size of tierFactories are the same. + checkState(transformedTierShuffleDescriptors.size() == tierFactories.size()); + int index = 0; Review Comment: Why not use `for-i` loop if we do need the iteration index? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ########## @@ -148,12 +162,52 @@ public void close() throws IOException { // -------------------------------------------------------------------------------------------- private List<TierConsumerAgent> createTierConsumerAgents( - List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) { + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<List<TierShuffleDescriptor>> shuffleDescriptors) { ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>(); + + List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors = + transformTierShuffleDescriptors(shuffleDescriptors); + // Each tier only requires one inner list of transformedTierShuffleDescriptors, so the size + // of transformedTierShuffleDescriptors and the size of tierFactories are the same. + checkState(transformedTierShuffleDescriptors.size() == tierFactories.size()); + int index = 0; for (TierFactory tierFactory : tierFactories) { tierConsumerAgents.add( - tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, nettyService)); + tierFactory.createConsumerAgent( + tieredStorageConsumerSpecs, + transformedTierShuffleDescriptors.get(index++), + nettyService)); } return tierConsumerAgents; } + + /** + * Before transforming the shuffle descriptors, the number of tier shuffle descriptors is + * numPartitions * numTiers (That means shuffleDescriptors.size() is numPartitions, while the + * shuffleDescriptors.get(0).size() is numTiers). After transforming, the number of tier shuffle + * descriptors is numTiers * numPartitions (That means transformedList.size() is numTiers, while + * transformedList.get(0).size() is numPartitions). + */ + private static List<List<TierShuffleDescriptor>> transformTierShuffleDescriptors( + List<List<TierShuffleDescriptor>> shuffleDescriptors) { + int numTiers = 0; + int numDescriptors = shuffleDescriptors.size(); Review Comment: Can be `numPartitions` according to the java doc. ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -490,6 +490,24 @@ public enum CompressionCodec { + " is configured. The new mode is currently in an experimental phase. It can be set to false to fallback to the legacy mode " + " if something unexpected. Once the new mode reaches a stable state, the legacy mode as well as the option will be removed."); + /** The option to configure the tiered factory creator remote class name for hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<String> + NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME = + key("taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class") + .stringType() + .noDefaultValue() + .withDescription( + "The option configures the class that is responsible for creating an " + + "external remote tier factory for hybrid shuffle. Note that " + + "only Celeborn can be accepted as the remote shuffle tier " Review Comment: only Celeborn -> only Apache Celeborn How do we make sure that only Apache Celeborn is the valid option? I didn't notice if we have the corresponding validation. ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java: ########## @@ -80,8 +83,14 @@ public NettyShuffleDescriptor buildRemote() { } public NettyShuffleDescriptor buildLocal() { + List<TierShuffleDescriptor> tierShuffleDescriptors = new ArrayList<>(); + tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE); + tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE); Review Comment: Why we need this redundancy? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org