This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f1536c8a6f66110c50ef57cb88ca5c543aa2104
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Mon Mar 27 12:55:11 2023 +0200

    [hotfix] Refactor JRR#findParallelism
    
    Fail if the vertex could not be found.
---
 .../flink/runtime/jobgraph/JobResourceRequirements.java       | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java
index 47d99709a93..b282ddcb018 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java
@@ -118,7 +118,7 @@ public class JobResourceRequirements implements 
Serializable {
                     
Optional.ofNullable(maxParallelismPerVertex.get(jobVertexId));
             if (maybeMaxParallelism.isPresent()) {
                 final JobVertexResourceRequirements.Parallelism 
requestedParallelism =
-                        
jobResourceRequirements.findParallelism(jobVertexId).get();
+                        jobResourceRequirements.getParallelism(jobVertexId);
                 int lowerBound =
                         requestedParallelism.getLowerBound() == -1
                                 ? 1
@@ -197,10 +197,13 @@ public class JobResourceRequirements implements 
Serializable {
                 Collections.unmodifiableMap(new 
HashMap<>(checkNotNull(vertexResources)));
     }
 
-    public Optional<JobVertexResourceRequirements.Parallelism> findParallelism(
-            JobVertexID jobVertexId) {
+    public JobVertexResourceRequirements.Parallelism 
getParallelism(JobVertexID jobVertexId) {
         return Optional.ofNullable(vertexResources.get(jobVertexId))
-                .map(JobVertexResourceRequirements::getParallelism);
+                .map(JobVertexResourceRequirements::getParallelism)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "No requirement set for vertex " + 
jobVertexId));
     }
 
     public Set<JobVertexID> getJobVertices() {

Reply via email to