This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a84de2ca4a9239bccb0d28c9bb6f841de1879bf9 Author: David Moravek <d...@apache.org> AuthorDate: Mon Feb 27 18:48:28 2023 +0100 [FLINK-31317] Introduce JobResourceRequirements and JobVertexResourceRequirements data structures. Signed-off-by: David Moravek <d...@apache.org> --- .../runtime/jobgraph/JobResourceRequirements.java | 235 +++++++++++++++++++++ .../jobgraph/JobVertexResourceRequirements.java | 119 +++++++++++ .../jobgraph/JobResourceRequirementsTest.java | 175 +++++++++++++++ 3 files changed, 529 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java new file mode 100644 index 00000000000..47d99709a93 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Information about the parallelism of job vertices. */ +public class JobResourceRequirements implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * A key for an internal config option (intentionally prefixed with $internal to make this + * explicit), that we'll serialize the {@link JobResourceRequirements} into, when writing it to + * {@link JobGraph}. + */ + private static final String JOB_RESOURCE_REQUIREMENTS_KEY = + "$internal.job-resource-requirements"; + + private static final JobResourceRequirements EMPTY = + new JobResourceRequirements(Collections.emptyMap()); + + /** + * Write {@link JobResourceRequirements resource requirements} into the configuration of a given + * {@link JobGraph}. + * + * @param jobGraph job graph to write requirements to + * @param jobResourceRequirements resource requirements to write + * @throws IOException in case we're not able to serialize requirements into the configuration + */ + public static void writeToJobGraph( + JobGraph jobGraph, JobResourceRequirements jobResourceRequirements) throws IOException { + InstantiationUtil.writeObjectToConfig( + jobResourceRequirements, + jobGraph.getJobConfiguration(), + JOB_RESOURCE_REQUIREMENTS_KEY); + } + + /** + * Read {@link JobResourceRequirements resource requirements} from the configuration of a given + * {@link JobGraph}. + * + * @param jobGraph job graph to read requirements from + * @throws IOException in case we're not able to deserialize requirements from the configuration + */ + public static Optional<JobResourceRequirements> readFromJobGraph(JobGraph jobGraph) + throws IOException { + try { + return Optional.ofNullable( + InstantiationUtil.readObjectFromConfig( + jobGraph.getJobConfiguration(), + JOB_RESOURCE_REQUIREMENTS_KEY, + JobResourceRequirements.class.getClassLoader())); + } catch (ClassNotFoundException e) { + throw new IOException( + "Unable to deserialize JobResourceRequirements due to missing classes. This might happen when the JobGraph was written from a different Flink version.", + e); + } + } + + /** + * This method validates that: + * + * <ul> + * <li>The requested boundaries are less or equal than the max parallelism. + * <li>The requested boundaries are greater than zero. + * <li>The requested upper bound is greater than the lower bound. + * <li>There are no unknown job vertex ids and that we're not missing any. + * </ul> + * + * In case any boundary is set to {@code -1}, it will be expanded to the default value ({@code + * 1} for the lower bound and the max parallelism for the upper bound), before the validation. + * + * @param jobResourceRequirements contains the new resources requirements for the job vertices + * @param maxParallelismPerVertex allows us to look up maximum possible parallelism for a job + * vertex + * @return a list of validation errors + */ + public static List<String> validate( + JobResourceRequirements jobResourceRequirements, + Map<JobVertexID, Integer> maxParallelismPerVertex) { + final List<String> errors = new ArrayList<>(); + final Set<JobVertexID> missingJobVertexIds = + new HashSet<>(maxParallelismPerVertex.keySet()); + for (JobVertexID jobVertexId : jobResourceRequirements.getJobVertices()) { + missingJobVertexIds.remove(jobVertexId); + final Optional<Integer> maybeMaxParallelism = + Optional.ofNullable(maxParallelismPerVertex.get(jobVertexId)); + if (maybeMaxParallelism.isPresent()) { + final JobVertexResourceRequirements.Parallelism requestedParallelism = + jobResourceRequirements.findParallelism(jobVertexId).get(); + int lowerBound = + requestedParallelism.getLowerBound() == -1 + ? 1 + : requestedParallelism.getLowerBound(); + int upperBound = + requestedParallelism.getUpperBound() == -1 + ? maybeMaxParallelism.get() + : requestedParallelism.getUpperBound(); + if (lowerBound < 1 || upperBound < 1) { + errors.add( + String.format( + "Both, the requested lower bound [%d] and upper bound [%d] for job vertex [%s] must be greater than zero.", + lowerBound, upperBound, jobVertexId)); + // Don't validate this vertex any further to avoid additional noise. + continue; + } + if (lowerBound > upperBound) { + errors.add( + String.format( + "The requested lower bound [%d] for job vertex [%s] is higher than the upper bound [%d].", + lowerBound, jobVertexId, upperBound)); + } + if (maybeMaxParallelism.get() < upperBound) { + errors.add( + String.format( + "The newly requested parallelism %d for the job vertex %s exceeds its maximum parallelism %d.", + upperBound, jobVertexId, maybeMaxParallelism.get())); + } + } else { + errors.add( + String.format( + "Job vertex [%s] was not found in the JobGraph.", jobVertexId)); + } + } + for (JobVertexID jobVertexId : missingJobVertexIds) { + errors.add( + String.format( + "The request is incomplete, missing job vertex [%s] resource requirements.", + jobVertexId)); + } + return errors; + } + + public static JobResourceRequirements empty() { + return JobResourceRequirements.EMPTY; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + + private final Map<JobVertexID, JobVertexResourceRequirements> vertexResources = + new HashMap<>(); + + public Builder setParallelismForJobVertex( + JobVertexID jobVertexId, int lowerBound, int upperBound) { + vertexResources.put( + jobVertexId, + new JobVertexResourceRequirements( + new JobVertexResourceRequirements.Parallelism(lowerBound, upperBound))); + return this; + } + + public JobResourceRequirements build() { + return new JobResourceRequirements(vertexResources); + } + } + + private final Map<JobVertexID, JobVertexResourceRequirements> vertexResources; + + public JobResourceRequirements( + Map<JobVertexID, JobVertexResourceRequirements> vertexResources) { + this.vertexResources = + Collections.unmodifiableMap(new HashMap<>(checkNotNull(vertexResources))); + } + + public Optional<JobVertexResourceRequirements.Parallelism> findParallelism( + JobVertexID jobVertexId) { + return Optional.ofNullable(vertexResources.get(jobVertexId)) + .map(JobVertexResourceRequirements::getParallelism); + } + + public Set<JobVertexID> getJobVertices() { + return vertexResources.keySet(); + } + + public Map<JobVertexID, JobVertexResourceRequirements> getJobVertexParallelisms() { + return vertexResources; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final JobResourceRequirements that = (JobResourceRequirements) o; + return Objects.equals(vertexResources, that.vertexResources); + } + + @Override + public int hashCode() { + return Objects.hash(vertexResources); + } + + @Override + public String toString() { + return "JobResourceRequirements{" + "vertexResources=" + vertexResources + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexResourceRequirements.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexResourceRequirements.java new file mode 100644 index 00000000000..3930501b002 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexResourceRequirements.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Information about the parallelism of job vertices. */ +public class JobVertexResourceRequirements implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final String FIELD_NAME_PARALLELISM = "parallelism"; + + public static class Parallelism implements Serializable { + + private static final String FIELD_NAME_LOWER_BOUND = "lowerBound"; + private static final String FIELD_NAME_UPPER_BOUND = "upperBound"; + + @JsonProperty(FIELD_NAME_LOWER_BOUND) + private final int lowerBound; + + @JsonProperty(FIELD_NAME_UPPER_BOUND) + private final int upperBound; + + @JsonCreator + public Parallelism( + @JsonProperty(FIELD_NAME_LOWER_BOUND) int lowerBound, + @JsonProperty(FIELD_NAME_UPPER_BOUND) int upperBound) { + this.lowerBound = lowerBound; + this.upperBound = upperBound; + } + + public int getLowerBound() { + return lowerBound; + } + + public int getUpperBound() { + return upperBound; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Parallelism that = (Parallelism) o; + return lowerBound == that.lowerBound && upperBound == that.upperBound; + } + + @Override + public int hashCode() { + return Objects.hash(lowerBound, upperBound); + } + + @Override + public String toString() { + return "Parallelism{" + "lowerBound=" + lowerBound + ", upperBound=" + upperBound + '}'; + } + } + + @JsonProperty(FIELD_NAME_PARALLELISM) + private final Parallelism parallelism; + + public JobVertexResourceRequirements( + @JsonProperty(FIELD_NAME_PARALLELISM) Parallelism parallelism) { + this.parallelism = checkNotNull(parallelism); + } + + public Parallelism getParallelism() { + return parallelism; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final JobVertexResourceRequirements that = (JobVertexResourceRequirements) o; + return parallelism.equals(that.parallelism); + } + + @Override + public int hashCode() { + return Objects.hash(parallelism); + } + + @Override + public String toString() { + return "JobVertexResourceRequirements{" + "parallelism=" + parallelism + '}'; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobResourceRequirementsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobResourceRequirementsTest.java new file mode 100644 index 00000000000..db39062420f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobResourceRequirementsTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link org.apache.flink.runtime.jobgraph.JobResourceRequirements}. */ +@ExtendWith(TestLoggerExtension.class) +class JobResourceRequirementsTest { + + private final JobVertexID firstVertexId = new JobVertexID(); + private final JobVertexID secondVertexId = new JobVertexID(); + + @Test + void testSuccessfulValidation() { + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, 1, 4) + .setParallelismForJobVertex(secondVertexId, 1, 4) + .build(); + final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>(); + maxParallelismPerVertex.put(firstVertexId, 10); + maxParallelismPerVertex.put(secondVertexId, 10); + final List<String> validationErrors = + JobResourceRequirements.validate(jobResourceRequirements, maxParallelismPerVertex); + assertThat(validationErrors).isEmpty(); + } + + @Test + void testValidateVertexIdsNotFoundInJobGraph() { + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, 1, 4) + .setParallelismForJobVertex(secondVertexId, 1, 4) + .build(); + final List<String> validationErrors = + JobResourceRequirements.validate(jobResourceRequirements, Collections.emptyMap()); + assertThat(validationErrors).hasSize(2); + for (String validationError : validationErrors) { + assertThat(validationError).contains("was not found in the JobGraph"); + } + } + + @Test + void testValidateUpperBoundHigherThanMaxParallelism() { + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, 1, 10) + .setParallelismForJobVertex(secondVertexId, 1, 5) + .build(); + final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>(); + maxParallelismPerVertex.put(firstVertexId, 5); + maxParallelismPerVertex.put(secondVertexId, 5); + final List<String> validationErrors = + JobResourceRequirements.validate(jobResourceRequirements, maxParallelismPerVertex); + assertThat(validationErrors).hasSize(1); + for (String validationError : validationErrors) { + assertThat(validationError).contains("exceeds its maximum parallelism"); + } + } + + @Test + void testValidateIncompleteRequirements() { + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, 1, 10) + .build(); + final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>(); + maxParallelismPerVertex.put(firstVertexId, 10); + maxParallelismPerVertex.put(secondVertexId, 10); + final List<String> validationErrors = + JobResourceRequirements.validate(jobResourceRequirements, maxParallelismPerVertex); + assertThat(validationErrors).hasSize(1); + for (String validationError : validationErrors) { + assertThat(validationError).contains("request is incomplete"); + } + } + + @Test + void testValidateLowerBoundDoesNotExceedUpperBound() { + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, 10, 9) + .setParallelismForJobVertex(secondVertexId, 10, 10) + .build(); + final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>(); + maxParallelismPerVertex.put(firstVertexId, 10); + maxParallelismPerVertex.put(secondVertexId, 10); + final List<String> validationErrors = + JobResourceRequirements.validate(jobResourceRequirements, maxParallelismPerVertex); + assertThat(validationErrors).hasSize(1); + for (String validationError : validationErrors) { + assertThat(validationError).contains("is higher than the upper bound"); + } + } + + @Test + void testValidateLowerOrUpperBoundIsLowerThanOne() { + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, 0, 10) + .setParallelismForJobVertex(secondVertexId, 1, 0) + .build(); + final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>(); + maxParallelismPerVertex.put(firstVertexId, 10); + maxParallelismPerVertex.put(secondVertexId, 10); + final List<String> validationErrors = + JobResourceRequirements.validate(jobResourceRequirements, maxParallelismPerVertex); + assertThat(validationErrors).hasSize(2); + for (String validationError : validationErrors) { + assertThat(validationError).contains("must be greater than zero"); + } + } + + @Test + void testValidateDefaults() { + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, -1, -1) + .build(); + final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>(); + maxParallelismPerVertex.put(firstVertexId, 10); + final List<String> validationErrors = + JobResourceRequirements.validate(jobResourceRequirements, maxParallelismPerVertex); + assertThat(validationErrors).isEmpty(); + } + + @Test + void testWriteToJobGraphAndReadFromJobGraph() throws IOException { + final JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph(); + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(firstVertexId, 10, 9) + .setParallelismForJobVertex(secondVertexId, 10, 10) + .build(); + + JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements); + assertThat(JobResourceRequirements.readFromJobGraph(jobGraph)) + .get() + .isEqualTo(jobResourceRequirements); + } + + @Test + void testReadNonExistentResourceRequirementsFromJobGraph() throws IOException { + assertThat(JobResourceRequirements.readFromJobGraph(JobGraphTestUtils.emptyJobGraph())) + .isEmpty(); + } +}