This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a84de2ca4a9239bccb0d28c9bb6f841de1879bf9
Author: David Moravek <d...@apache.org>
AuthorDate: Mon Feb 27 18:48:28 2023 +0100

    [FLINK-31317] Introduce JobResourceRequirements and 
JobVertexResourceRequirements data structures.
    
    Signed-off-by: David Moravek <d...@apache.org>
---
 .../runtime/jobgraph/JobResourceRequirements.java  | 235 +++++++++++++++++++++
 .../jobgraph/JobVertexResourceRequirements.java    | 119 +++++++++++
 .../jobgraph/JobResourceRequirementsTest.java      | 175 +++++++++++++++
 3 files changed, 529 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java
new file mode 100644
index 00000000000..47d99709a93
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Information about the parallelism of job vertices. */
+public class JobResourceRequirements implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A key for an internal config option (intentionally prefixed with 
$internal to make this
+     * explicit), that we'll serialize the {@link JobResourceRequirements} 
into, when writing it to
+     * {@link JobGraph}.
+     */
+    private static final String JOB_RESOURCE_REQUIREMENTS_KEY =
+            "$internal.job-resource-requirements";
+
+    private static final JobResourceRequirements EMPTY =
+            new JobResourceRequirements(Collections.emptyMap());
+
+    /**
+     * Write {@link JobResourceRequirements resource requirements} into the 
configuration of a given
+     * {@link JobGraph}.
+     *
+     * @param jobGraph job graph to write requirements to
+     * @param jobResourceRequirements resource requirements to write
+     * @throws IOException in case we're not able to serialize requirements 
into the configuration
+     */
+    public static void writeToJobGraph(
+            JobGraph jobGraph, JobResourceRequirements 
jobResourceRequirements) throws IOException {
+        InstantiationUtil.writeObjectToConfig(
+                jobResourceRequirements,
+                jobGraph.getJobConfiguration(),
+                JOB_RESOURCE_REQUIREMENTS_KEY);
+    }
+
+    /**
+     * Read {@link JobResourceRequirements resource requirements} from the 
configuration of a given
+     * {@link JobGraph}.
+     *
+     * @param jobGraph job graph to read requirements from
+     * @throws IOException in case we're not able to deserialize requirements 
from the configuration
+     */
+    public static Optional<JobResourceRequirements> readFromJobGraph(JobGraph 
jobGraph)
+            throws IOException {
+        try {
+            return Optional.ofNullable(
+                    InstantiationUtil.readObjectFromConfig(
+                            jobGraph.getJobConfiguration(),
+                            JOB_RESOURCE_REQUIREMENTS_KEY,
+                            JobResourceRequirements.class.getClassLoader()));
+        } catch (ClassNotFoundException e) {
+            throw new IOException(
+                    "Unable to deserialize JobResourceRequirements due to 
missing classes. This might happen when the JobGraph was written from a 
different Flink version.",
+                    e);
+        }
+    }
+
+    /**
+     * This method validates that:
+     *
+     * <ul>
+     *   <li>The requested boundaries are less or equal than the max 
parallelism.
+     *   <li>The requested boundaries are greater than zero.
+     *   <li>The requested upper bound is greater than the lower bound.
+     *   <li>There are no unknown job vertex ids and that we're not missing 
any.
+     * </ul>
+     *
+     * In case any boundary is set to {@code -1}, it will be expanded to the 
default value ({@code
+     * 1} for the lower bound and the max parallelism for the upper bound), 
before the validation.
+     *
+     * @param jobResourceRequirements contains the new resources requirements 
for the job vertices
+     * @param maxParallelismPerVertex allows us to look up maximum possible 
parallelism for a job
+     *     vertex
+     * @return a list of validation errors
+     */
+    public static List<String> validate(
+            JobResourceRequirements jobResourceRequirements,
+            Map<JobVertexID, Integer> maxParallelismPerVertex) {
+        final List<String> errors = new ArrayList<>();
+        final Set<JobVertexID> missingJobVertexIds =
+                new HashSet<>(maxParallelismPerVertex.keySet());
+        for (JobVertexID jobVertexId : 
jobResourceRequirements.getJobVertices()) {
+            missingJobVertexIds.remove(jobVertexId);
+            final Optional<Integer> maybeMaxParallelism =
+                    
Optional.ofNullable(maxParallelismPerVertex.get(jobVertexId));
+            if (maybeMaxParallelism.isPresent()) {
+                final JobVertexResourceRequirements.Parallelism 
requestedParallelism =
+                        
jobResourceRequirements.findParallelism(jobVertexId).get();
+                int lowerBound =
+                        requestedParallelism.getLowerBound() == -1
+                                ? 1
+                                : requestedParallelism.getLowerBound();
+                int upperBound =
+                        requestedParallelism.getUpperBound() == -1
+                                ? maybeMaxParallelism.get()
+                                : requestedParallelism.getUpperBound();
+                if (lowerBound < 1 || upperBound < 1) {
+                    errors.add(
+                            String.format(
+                                    "Both, the requested lower bound [%d] and 
upper bound [%d] for job vertex [%s] must be greater than zero.",
+                                    lowerBound, upperBound, jobVertexId));
+                    // Don't validate this vertex any further to avoid 
additional noise.
+                    continue;
+                }
+                if (lowerBound > upperBound) {
+                    errors.add(
+                            String.format(
+                                    "The requested lower bound [%d] for job 
vertex [%s] is higher than the upper bound [%d].",
+                                    lowerBound, jobVertexId, upperBound));
+                }
+                if (maybeMaxParallelism.get() < upperBound) {
+                    errors.add(
+                            String.format(
+                                    "The newly requested parallelism %d for 
the job vertex %s exceeds its maximum parallelism %d.",
+                                    upperBound, jobVertexId, 
maybeMaxParallelism.get()));
+                }
+            } else {
+                errors.add(
+                        String.format(
+                                "Job vertex [%s] was not found in the 
JobGraph.", jobVertexId));
+            }
+        }
+        for (JobVertexID jobVertexId : missingJobVertexIds) {
+            errors.add(
+                    String.format(
+                            "The request is incomplete, missing job vertex 
[%s] resource requirements.",
+                            jobVertexId));
+        }
+        return errors;
+    }
+
+    public static JobResourceRequirements empty() {
+        return JobResourceRequirements.EMPTY;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+
+        private final Map<JobVertexID, JobVertexResourceRequirements> 
vertexResources =
+                new HashMap<>();
+
+        public Builder setParallelismForJobVertex(
+                JobVertexID jobVertexId, int lowerBound, int upperBound) {
+            vertexResources.put(
+                    jobVertexId,
+                    new JobVertexResourceRequirements(
+                            new 
JobVertexResourceRequirements.Parallelism(lowerBound, upperBound)));
+            return this;
+        }
+
+        public JobResourceRequirements build() {
+            return new JobResourceRequirements(vertexResources);
+        }
+    }
+
+    private final Map<JobVertexID, JobVertexResourceRequirements> 
vertexResources;
+
+    public JobResourceRequirements(
+            Map<JobVertexID, JobVertexResourceRequirements> vertexResources) {
+        this.vertexResources =
+                Collections.unmodifiableMap(new 
HashMap<>(checkNotNull(vertexResources)));
+    }
+
+    public Optional<JobVertexResourceRequirements.Parallelism> findParallelism(
+            JobVertexID jobVertexId) {
+        return Optional.ofNullable(vertexResources.get(jobVertexId))
+                .map(JobVertexResourceRequirements::getParallelism);
+    }
+
+    public Set<JobVertexID> getJobVertices() {
+        return vertexResources.keySet();
+    }
+
+    public Map<JobVertexID, JobVertexResourceRequirements> 
getJobVertexParallelisms() {
+        return vertexResources;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final JobResourceRequirements that = (JobResourceRequirements) o;
+        return Objects.equals(vertexResources, that.vertexResources);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(vertexResources);
+    }
+
+    @Override
+    public String toString() {
+        return "JobResourceRequirements{" + "vertexResources=" + 
vertexResources + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexResourceRequirements.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexResourceRequirements.java
new file mode 100644
index 00000000000..3930501b002
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexResourceRequirements.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Information about the parallelism of job vertices. */
+public class JobVertexResourceRequirements implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_NAME_PARALLELISM = "parallelism";
+
+    public static class Parallelism implements Serializable {
+
+        private static final String FIELD_NAME_LOWER_BOUND = "lowerBound";
+        private static final String FIELD_NAME_UPPER_BOUND = "upperBound";
+
+        @JsonProperty(FIELD_NAME_LOWER_BOUND)
+        private final int lowerBound;
+
+        @JsonProperty(FIELD_NAME_UPPER_BOUND)
+        private final int upperBound;
+
+        @JsonCreator
+        public Parallelism(
+                @JsonProperty(FIELD_NAME_LOWER_BOUND) int lowerBound,
+                @JsonProperty(FIELD_NAME_UPPER_BOUND) int upperBound) {
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+        }
+
+        public int getLowerBound() {
+            return lowerBound;
+        }
+
+        public int getUpperBound() {
+            return upperBound;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final Parallelism that = (Parallelism) o;
+            return lowerBound == that.lowerBound && upperBound == 
that.upperBound;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(lowerBound, upperBound);
+        }
+
+        @Override
+        public String toString() {
+            return "Parallelism{" + "lowerBound=" + lowerBound + ", 
upperBound=" + upperBound + '}';
+        }
+    }
+
+    @JsonProperty(FIELD_NAME_PARALLELISM)
+    private final Parallelism parallelism;
+
+    public JobVertexResourceRequirements(
+            @JsonProperty(FIELD_NAME_PARALLELISM) Parallelism parallelism) {
+        this.parallelism = checkNotNull(parallelism);
+    }
+
+    public Parallelism getParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final JobVertexResourceRequirements that = 
(JobVertexResourceRequirements) o;
+        return parallelism.equals(that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(parallelism);
+    }
+
+    @Override
+    public String toString() {
+        return "JobVertexResourceRequirements{" + "parallelism=" + parallelism 
+ '}';
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobResourceRequirementsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobResourceRequirementsTest.java
new file mode 100644
index 00000000000..db39062420f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobResourceRequirementsTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link 
org.apache.flink.runtime.jobgraph.JobResourceRequirements}. */
+@ExtendWith(TestLoggerExtension.class)
+class JobResourceRequirementsTest {
+
+    private final JobVertexID firstVertexId = new JobVertexID();
+    private final JobVertexID secondVertexId = new JobVertexID();
+
+    @Test
+    void testSuccessfulValidation() {
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, 1, 4)
+                        .setParallelismForJobVertex(secondVertexId, 1, 4)
+                        .build();
+        final Map<JobVertexID, Integer> maxParallelismPerVertex = new 
HashMap<>();
+        maxParallelismPerVertex.put(firstVertexId, 10);
+        maxParallelismPerVertex.put(secondVertexId, 10);
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(jobResourceRequirements, 
maxParallelismPerVertex);
+        assertThat(validationErrors).isEmpty();
+    }
+
+    @Test
+    void testValidateVertexIdsNotFoundInJobGraph() {
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, 1, 4)
+                        .setParallelismForJobVertex(secondVertexId, 1, 4)
+                        .build();
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(jobResourceRequirements, 
Collections.emptyMap());
+        assertThat(validationErrors).hasSize(2);
+        for (String validationError : validationErrors) {
+            assertThat(validationError).contains("was not found in the 
JobGraph");
+        }
+    }
+
+    @Test
+    void testValidateUpperBoundHigherThanMaxParallelism() {
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, 1, 10)
+                        .setParallelismForJobVertex(secondVertexId, 1, 5)
+                        .build();
+        final Map<JobVertexID, Integer> maxParallelismPerVertex = new 
HashMap<>();
+        maxParallelismPerVertex.put(firstVertexId, 5);
+        maxParallelismPerVertex.put(secondVertexId, 5);
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(jobResourceRequirements, 
maxParallelismPerVertex);
+        assertThat(validationErrors).hasSize(1);
+        for (String validationError : validationErrors) {
+            assertThat(validationError).contains("exceeds its maximum 
parallelism");
+        }
+    }
+
+    @Test
+    void testValidateIncompleteRequirements() {
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, 1, 10)
+                        .build();
+        final Map<JobVertexID, Integer> maxParallelismPerVertex = new 
HashMap<>();
+        maxParallelismPerVertex.put(firstVertexId, 10);
+        maxParallelismPerVertex.put(secondVertexId, 10);
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(jobResourceRequirements, 
maxParallelismPerVertex);
+        assertThat(validationErrors).hasSize(1);
+        for (String validationError : validationErrors) {
+            assertThat(validationError).contains("request is incomplete");
+        }
+    }
+
+    @Test
+    void testValidateLowerBoundDoesNotExceedUpperBound() {
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, 10, 9)
+                        .setParallelismForJobVertex(secondVertexId, 10, 10)
+                        .build();
+        final Map<JobVertexID, Integer> maxParallelismPerVertex = new 
HashMap<>();
+        maxParallelismPerVertex.put(firstVertexId, 10);
+        maxParallelismPerVertex.put(secondVertexId, 10);
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(jobResourceRequirements, 
maxParallelismPerVertex);
+        assertThat(validationErrors).hasSize(1);
+        for (String validationError : validationErrors) {
+            assertThat(validationError).contains("is higher than the upper 
bound");
+        }
+    }
+
+    @Test
+    void testValidateLowerOrUpperBoundIsLowerThanOne() {
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, 0, 10)
+                        .setParallelismForJobVertex(secondVertexId, 1, 0)
+                        .build();
+        final Map<JobVertexID, Integer> maxParallelismPerVertex = new 
HashMap<>();
+        maxParallelismPerVertex.put(firstVertexId, 10);
+        maxParallelismPerVertex.put(secondVertexId, 10);
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(jobResourceRequirements, 
maxParallelismPerVertex);
+        assertThat(validationErrors).hasSize(2);
+        for (String validationError : validationErrors) {
+            assertThat(validationError).contains("must be greater than zero");
+        }
+    }
+
+    @Test
+    void testValidateDefaults() {
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, -1, -1)
+                        .build();
+        final Map<JobVertexID, Integer> maxParallelismPerVertex = new 
HashMap<>();
+        maxParallelismPerVertex.put(firstVertexId, 10);
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(jobResourceRequirements, 
maxParallelismPerVertex);
+        assertThat(validationErrors).isEmpty();
+    }
+
+    @Test
+    void testWriteToJobGraphAndReadFromJobGraph() throws IOException {
+        final JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(firstVertexId, 10, 9)
+                        .setParallelismForJobVertex(secondVertexId, 10, 10)
+                        .build();
+
+        JobResourceRequirements.writeToJobGraph(jobGraph, 
jobResourceRequirements);
+        assertThat(JobResourceRequirements.readFromJobGraph(jobGraph))
+                .get()
+                .isEqualTo(jobResourceRequirements);
+    }
+
+    @Test
+    void testReadNonExistentResourceRequirementsFromJobGraph() throws 
IOException {
+        
assertThat(JobResourceRequirements.readFromJobGraph(JobGraphTestUtils.emptyJobGraph()))
+                .isEmpty();
+    }
+}

Reply via email to