reswqa commented on code in PR #24900:
URL: https://github.com/apache/flink/pull/24900#discussion_r1634238153


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -148,12 +162,52 @@ public void close() throws IOException {
     // 
--------------------------------------------------------------------------------------------
 
     private List<TierConsumerAgent> createTierConsumerAgents(
-            List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) {
+            List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<List<TierShuffleDescriptor>> shuffleDescriptors) {
         ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>();
+
+        List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors =
+                transformTierShuffleDescriptors(shuffleDescriptors);
+        // Each tier only requires one inner list of 
transformedTierShuffleDescriptors, so the size
+        // of transformedTierShuffleDescriptors and the size of tierFactories 
are the same.
+        checkState(transformedTierShuffleDescriptors.size() == 
tierFactories.size());
+        int index = 0;

Review Comment:
   Why not use `for-i` loop if we do need the iteration index?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -148,12 +162,52 @@ public void close() throws IOException {
     // 
--------------------------------------------------------------------------------------------
 
     private List<TierConsumerAgent> createTierConsumerAgents(
-            List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) {
+            List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<List<TierShuffleDescriptor>> shuffleDescriptors) {
         ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>();
+
+        List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors =
+                transformTierShuffleDescriptors(shuffleDescriptors);
+        // Each tier only requires one inner list of 
transformedTierShuffleDescriptors, so the size
+        // of transformedTierShuffleDescriptors and the size of tierFactories 
are the same.
+        checkState(transformedTierShuffleDescriptors.size() == 
tierFactories.size());
+        int index = 0;
         for (TierFactory tierFactory : tierFactories) {
             tierConsumerAgents.add(
-                    
tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, nettyService));
+                    tierFactory.createConsumerAgent(
+                            tieredStorageConsumerSpecs,
+                            transformedTierShuffleDescriptors.get(index++),
+                            nettyService));
         }
         return tierConsumerAgents;
     }
+
+    /**
+     * Before transforming the shuffle descriptors, the number of tier shuffle 
descriptors is
+     * numPartitions * numTiers (That means shuffleDescriptors.size() is 
numPartitions, while the
+     * shuffleDescriptors.get(0).size() is numTiers). After transforming, the 
number of tier shuffle
+     * descriptors is numTiers * numPartitions (That means 
transformedList.size() is numTiers, while
+     * transformedList.get(0).size() is numPartitions).
+     */
+    private static List<List<TierShuffleDescriptor>> 
transformTierShuffleDescriptors(
+            List<List<TierShuffleDescriptor>> shuffleDescriptors) {
+        int numTiers = 0;
+        int numDescriptors = shuffleDescriptors.size();

Review Comment:
   Can be `numPartitions` according to the java doc.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -490,6 +490,24 @@ public enum CompressionCodec {
                                     + " is configured. The new mode is 
currently in an experimental phase. It can be set to false to fallback to the 
legacy mode "
                                     + " if something unexpected. Once the new 
mode reaches a stable state, the legacy mode as well as the option will be 
removed.");
 
+    /** The option to configure the tiered factory creator remote class name 
for hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<String>
+            NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME =
+                    
key("taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class")
+                            .stringType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    "The option configures the class that is 
responsible for creating an "
+                                            + "external remote tier factory 
for hybrid shuffle. Note that "
+                                            + "only Celeborn can be accepted 
as the remote shuffle tier "

Review Comment:
   only Celeborn -> only Apache Celeborn
   
   How do we make sure that only Apache Celeborn is the valid option? I didn't 
notice if we have the corresponding validation.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java:
##########
@@ -80,8 +83,14 @@ public NettyShuffleDescriptor buildRemote() {
     }
 
     public NettyShuffleDescriptor buildLocal() {
+        List<TierShuffleDescriptor> tierShuffleDescriptors = new ArrayList<>();
+        tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE);
+        tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE);

Review Comment:
   Why we need this redundancy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to