xiangqiao123 commented on a change in pull request #18050:
URL: https://github.com/apache/flink/pull/18050#discussion_r770484017
##
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false,
Arrays.asList(7, 7));
+}
+
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph()
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false,
Arrays.asList(4, 3));
+}
+
+@Test
+public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true,
Arrays.asList(7, 7));
+}
+
+@Test
+public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointWiseGraph()
+throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true,
Arrays.asList(4, 4));
Review comment:
Will this scenario exist in real job?
##
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
##
@@ -109,4 +110,8 @@ void notifySchedulerNgAboutInternalTaskFailure(
void deleteBlobs(List blobKeys);
void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
throws JobException;
+
+ExecutionJobVertex getJobVertex(JobVertexID id);
Review comment:
Is it better to use `getExecutionJobVertex` for the method name?
##
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##
@@ -77,6 +85,69 @@ public ResultPartitionType getResultType() {
return getEdgeManager().getConsumedPartitionGroupsById(partitionId);
}
+public int getNumberOfSubpartitions() {
+if (numberOfSubpartitions == UNKNOWN) {
+numberOfSubpartitions = getOrComputeNumberOfSubpartitions();
+}
+checkState(
+numberOfSubpartitions > 0,
+"Number of subpartitions is an unexpected value: " +
numberOfSubpartitions);
+
+return numberOfSubpartitions;
+}
+
+private int getOrComputeNumberOfSubpartitions() {
+if (!getProducer().getExecutionGraphAccessor().isDynamic()) {
+// The produced data is partitioned among a number of
subpartitions.
+//
+// If no consumers are known at this point, we use a single
subpartition, otherwise we
+// have one for each consuming sub task.
+int numberOfSubpartitions = 1;
+List consumerVertexGroups =
getConsumerVertexGroups();
+if (!consumerVertexGroups.isEmpty() &&
!consumerVertexGroups.get(0).isEmpty()) {
+if (consumerVertexGroups.size() > 1) {
+throw new IllegalStateException(
+"Currently, only a single consumer group per
partition is supported.");
+}
+numberOfSubpartitions = consumerVertexGroups.get(0).size();
+}
+
+return numberOfSubpartitions;
+} else {
+if (totalResult.isBroadcast()) {
+// for dynamic graph and broadcast result, we only produced
one subpartition,
+// and all the downstream vertices should consume this
subpartition.
+return 1;
+} else {
+return computeNumberOfMaxPossiblePartitionConsumers();
+}
+}
+}
+
+private int computeNumberOfMaxPossiblePartitionConsumers() {
+final ExecutionJobVertex consumerJobVertex =
+getIntermediateResult().getConsumerExecutionJobVertex();
+final DistributionPattern distributionPattern =
+getIntermediateResult().getConsumingDistributionPattern();
+
+// decide the max possible consumer job vertex parallelism
+int maxConsumerJobVertexParallelism =
consumerJobVertex.getParallelism();
Review comment:
It doesn't seem necessary,and may may cause `return (int)
Math.ceil(((double) maxConsumerJobVertexParallelism) / numberOfPartitions);`
is 0
##
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()