zhuzhurk commented on code in PR #25790:
URL: https://github.com/apache/flink/pull/25790#discussion_r1897206373


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -207,16 +214,21 @@ static IndexRange computeConsumedSubpartitionRange(
             int numConsumers,
             Supplier<Integer> numOfSubpartitionsSupplier,
             boolean isDynamicGraph,
-            boolean isBroadcast) {
+            boolean isBroadcast,
+            boolean isSingleSubpartitionContainsAllData) {
         int consumerIndex = consumerSubtaskIndex % numConsumers;
         if (!isDynamicGraph) {
             return new IndexRange(consumerIndex, consumerIndex);
         } else {
             int numSubpartitions = numOfSubpartitionsSupplier.get();
             if (isBroadcast) {
-                // broadcast results have only one subpartition, and be 
consumed multiple times.
-                checkArgument(numSubpartitions == 1);
-                return new IndexRange(0, 0);
+                if (isSingleSubpartitionContainsAllData) {
+                    // broadcast results have only one subpartition, and be 
consumed multiple times.

Review Comment:
   broadcast results -> early decided broadcast results



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java:
##########
@@ -139,7 +153,14 @@ public void resetPartitionInfo(int partitionIndex) {
     }
 
     public List<Long> getAggregatedSubpartitionBytes() {
-        checkState(aggregatedSubpartitionBytes != null, "Not all partition 
infos are ready");
-        return Collections.unmodifiableList(aggregatedSubpartitionBytes);
+        checkState(
+                aggregatedSubpartitionBytes != null
+                        || subpartitionBytesByPartitionIndex.size() == 
numOfPartitions,
+                "Not all partition infos are ready");
+        if (aggregatedSubpartitionBytes == null) {
+            return getAggregatedSubpartitionBytesInternal();

Review Comment:
   Maybe explain in what the case the aggregated result will be needed while 
the fined grained stats should not be aggregated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java:
##########
@@ -207,6 +220,15 @@ public boolean isForward() {
         return intermediateDataSet.isForward();
     }
 
+    /**
+     * Checks if a single subpartition contains all the produced data.

Review Comment:
   Comments are needed to explain its relationship with broadcast, i.e. if the 
result is decided to be consumed in broadcast way before its data is produced, 
... If it is decided to be consumed in broadcast way after its data is 
produced,...



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -482,15 +497,30 @@ private void updateResultPartitionBytesMetrics(
                             result.getId(),
                             (ignored, resultInfo) -> {
                                 if (resultInfo == null) {
-                                    resultInfo = 
createFromIntermediateResult(result);
+                                    resultInfo =
+                                            
createFromIntermediateResult(result, new HashMap<>());
                                 }
                                 resultInfo.recordPartitionInfo(
                                         partitionId.getPartitionNumber(), 
partitionBytes);
+                                maybeAggregateSubpartitionBytes(resultInfo);
                                 return resultInfo;
                             });
                 });
     }
 
+    private void maybeAggregateSubpartitionBytes(BlockingResultInfo 
resultInfo) {

Review Comment:
   It's better to add some comments to explain why the bytes should and can be 
aggregated and in which case this method should be invoked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to