This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a7407730f039238b7f43c69ef83f3c0374e54123 Author: Gao Yun <yungao...@alibaba-inc.com> AuthorDate: Tue Jun 25 23:37:05 2019 +0800 [FLINK-12765][coordinator] Disable jobs where some, but nor all, vertices have configured resource profiles --- .../flink/runtime/dispatcher/Dispatcher.java | 25 ++++++++++++++ .../flink/runtime/dispatcher/DispatcherTest.java | 40 ++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 6f8f27d..4909885 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; @@ -265,6 +267,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme if (isDuplicateJob(jobGraph.getJobID())) { return FutureUtils.completedExceptionally( new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted.")); + } else if (isPartialResourceConfigured(jobGraph)) { + return FutureUtils.completedExceptionally( + new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " + + "resources configured. The limitation will be removed in future versions.")); } else { return internalSubmitJob(jobGraph); } @@ -292,6 +298,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId); } + private boolean isPartialResourceConfigured(JobGraph jobGraph) { + boolean hasVerticesWithUnknownResource = false; + boolean hasVerticesWithConfiguredResource = false; + + for (JobVertex jobVertex : jobGraph.getVertices()) { + if (jobVertex.getMinResources() == ResourceSpec.UNKNOWN) { + hasVerticesWithUnknownResource = true; + } else { + hasVerticesWithConfiguredResource = true; + } + + if (hasVerticesWithUnknownResource && hasVerticesWithConfiguredResource) { + return true; + } + } + + return false; + } + private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java old mode 100644 new mode 100755 index 554e7a0..714c0b2 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; @@ -28,6 +29,7 @@ import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; +import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -270,6 +272,44 @@ public class DispatcherTest extends TestLogger { } /** + * Tests that we can submit a job to the Dispatcher which then spawns a + * new JobManagerRunner. + */ + @Test + public void testJobSubmissionWithPartialResourceConfigured() throws Exception { + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch)); + + CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + + // wait for the leader to be elected + leaderFuture.get(); + + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + ResourceSpec resourceSpec = ResourceSpec.newBuilder().setCpuCores(2).build(); + + final JobVertex firstVertex = new JobVertex("firstVertex"); + firstVertex.setInvokableClass(NoOpInvokable.class); + firstVertex.setResources(resourceSpec, resourceSpec); + + final JobVertex secondVertex = new JobVertex("secondVertex"); + secondVertex.setInvokableClass(NoOpInvokable.class); + + JobGraph jobGraphWithTwoVertices = new JobGraph(TEST_JOB_ID, "twoVerticesJob", firstVertex, secondVertex); + jobGraphWithTwoVertices.setAllowQueuedScheduling(true); + + CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT); + + try { + acknowledgeFuture.get(); + fail("job submission should have failed"); + } + catch (ExecutionException e) { + assertTrue(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent()); + } + } + + /** * Tests that the dispatcher takes part in the leader election. */ @Test