[GitHub] [flink] xiangqiao123 commented on a change in pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-16 Thread GitBox


xiangqiao123 commented on a change in pull request #18050:
URL: https://github.com/apache/flink/pull/18050#discussion_r770484017



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws 
Exception {
 assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
 }
 
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() 
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, 
Arrays.asList(7, 7));
+}
+
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph() 
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false, 
Arrays.asList(4, 3));
+}
+
+@Test
+public void 
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true, 
Arrays.asList(7, 7));
+}
+
+@Test
+public void 
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointWiseGraph()
+throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true, 
Arrays.asList(4, 4));

Review comment:
   Will this scenario exist in real job?
   consumerParallelism != -1 and isDynamicGraph = true




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xiangqiao123 commented on a change in pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-16 Thread GitBox


xiangqiao123 commented on a change in pull request #18050:
URL: https://github.com/apache/flink/pull/18050#discussion_r770483911



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws 
Exception {
 assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
 }
 
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() 
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, 
Arrays.asList(7, 7));
+}
+
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph() 
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false, 
Arrays.asList(4, 3));
+}
+
+@Test
+public void 
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true, 
Arrays.asList(7, 7));

Review comment:
   Will this scenario exist in real job?
   consumerParallelism != -1 and isDynamicGraph = true




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xiangqiao123 commented on a change in pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-16 Thread GitBox


xiangqiao123 commented on a change in pull request #18050:
URL: https://github.com/apache/flink/pull/18050#discussion_r770484017



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws 
Exception {
 assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
 }
 
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() 
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, 
Arrays.asList(7, 7));
+}
+
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph() 
throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false, 
Arrays.asList(4, 3));
+}
+
+@Test
+public void 
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true, 
Arrays.asList(7, 7));
+}
+
+@Test
+public void 
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointWiseGraph()
+throws Exception {
+testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true, 
Arrays.asList(4, 4));

Review comment:
   Will this scenario exist in real job?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
##
@@ -109,4 +110,8 @@ void notifySchedulerNgAboutInternalTaskFailure(
 void deleteBlobs(List blobKeys);
 
 void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp) 
throws JobException;
+
+ExecutionJobVertex getJobVertex(JobVertexID id);

Review comment:
   Is it better to use `getExecutionJobVertex` for the method name?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##
@@ -77,6 +85,69 @@ public ResultPartitionType getResultType() {
 return getEdgeManager().getConsumedPartitionGroupsById(partitionId);
 }
 
+public int getNumberOfSubpartitions() {
+if (numberOfSubpartitions == UNKNOWN) {
+numberOfSubpartitions = getOrComputeNumberOfSubpartitions();
+}
+checkState(
+numberOfSubpartitions > 0,
+"Number of subpartitions is an unexpected value: " + 
numberOfSubpartitions);
+
+return numberOfSubpartitions;
+}
+
+private int getOrComputeNumberOfSubpartitions() {
+if (!getProducer().getExecutionGraphAccessor().isDynamic()) {
+// The produced data is partitioned among a number of 
subpartitions.
+//
+// If no consumers are known at this point, we use a single 
subpartition, otherwise we
+// have one for each consuming sub task.
+int numberOfSubpartitions = 1;
+List consumerVertexGroups = 
getConsumerVertexGroups();
+if (!consumerVertexGroups.isEmpty() && 
!consumerVertexGroups.get(0).isEmpty()) {
+if (consumerVertexGroups.size() > 1) {
+throw new IllegalStateException(
+"Currently, only a single consumer group per 
partition is supported.");
+}
+numberOfSubpartitions = consumerVertexGroups.get(0).size();
+}
+
+return numberOfSubpartitions;
+} else {
+if (totalResult.isBroadcast()) {
+// for dynamic graph and broadcast result, we only produced 
one subpartition,
+// and all the downstream vertices should consume this 
subpartition.
+return 1;
+} else {
+return computeNumberOfMaxPossiblePartitionConsumers();
+}
+}
+}
+
+private int computeNumberOfMaxPossiblePartitionConsumers() {
+final ExecutionJobVertex consumerJobVertex =
+getIntermediateResult().getConsumerExecutionJobVertex();
+final DistributionPattern distributionPattern =
+getIntermediateResult().getConsumingDistributionPattern();
+
+// decide the max possible consumer job vertex parallelism
+int maxConsumerJobVertexParallelism = 
consumerJobVertex.getParallelism();

Review comment:
   It doesn't seem necessary,and may may cause `return (int) 
Math.ceil(((double) maxConsumerJobVertexParallelism) / numberOfPartitions);`  
is 0

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws 
Exception {
 assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
 }
 
+@Test
+public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()