This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7407730f039238b7f43c69ef83f3c0374e54123
Author: Gao Yun <yungao...@alibaba-inc.com>
AuthorDate: Tue Jun 25 23:37:05 2019 +0800

    [FLINK-12765][coordinator] Disable jobs where some, but nor all, vertices 
have configured resource profiles
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 25 ++++++++++++++
 .../flink/runtime/dispatcher/DispatcherTest.java   | 40 ++++++++++++++++++++++
 2 files changed, 65 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 6f8f27d..4909885 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -35,6 +36,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
@@ -265,6 +267,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        if (isDuplicateJob(jobGraph.getJobID())) {
                                return FutureUtils.completedExceptionally(
                                        new 
JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
+                       } else if (isPartialResourceConfigured(jobGraph)) {
+                               return FutureUtils.completedExceptionally(
+                                       new 
JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if 
parts of the vertices have " +
+                                                       "resources configured. 
The limitation will be removed in future versions."));
                        } else {
                                return internalSubmitJob(jobGraph);
                        }
@@ -292,6 +298,25 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                return jobSchedulingStatus == 
RunningJobsRegistry.JobSchedulingStatus.DONE || 
jobManagerRunnerFutures.containsKey(jobId);
        }
 
+       private boolean isPartialResourceConfigured(JobGraph jobGraph) {
+               boolean hasVerticesWithUnknownResource = false;
+               boolean hasVerticesWithConfiguredResource = false;
+
+               for (JobVertex jobVertex : jobGraph.getVertices()) {
+                       if (jobVertex.getMinResources() == 
ResourceSpec.UNKNOWN) {
+                               hasVerticesWithUnknownResource = true;
+                       } else {
+                               hasVerticesWithConfiguredResource = true;
+                       }
+
+                       if (hasVerticesWithUnknownResource && 
hasVerticesWithConfiguredResource) {
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
        private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph 
jobGraph) {
                log.info("Submitting job {} ({}).", jobGraph.getJobID(), 
jobGraph.getName());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
old mode 100644
new mode 100755
index 554e7a0..714c0b2
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -270,6 +272,44 @@ public class DispatcherTest extends TestLogger {
        }
 
        /**
+        * Tests that we can submit a job to the Dispatcher which then spawns a
+        * new JobManagerRunner.
+        */
+       @Test
+       public void testJobSubmissionWithPartialResourceConfigured() throws 
Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
+               CompletableFuture<UUID> leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+
+               // wait for the leader to be elected
+               leaderFuture.get();
+
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               ResourceSpec resourceSpec = 
ResourceSpec.newBuilder().setCpuCores(2).build();
+
+               final JobVertex firstVertex = new JobVertex("firstVertex");
+               firstVertex.setInvokableClass(NoOpInvokable.class);
+               firstVertex.setResources(resourceSpec, resourceSpec);
+
+               final JobVertex secondVertex = new JobVertex("secondVertex");
+               secondVertex.setInvokableClass(NoOpInvokable.class);
+
+               JobGraph jobGraphWithTwoVertices = new JobGraph(TEST_JOB_ID, 
"twoVerticesJob", firstVertex, secondVertex);
+               jobGraphWithTwoVertices.setAllowQueuedScheduling(true);
+
+               CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT);
+
+               try {
+                       acknowledgeFuture.get();
+                       fail("job submission should have failed");
+               }
+               catch (ExecutionException e) {
+                       assertTrue(ExceptionUtils.findThrowable(e, 
JobSubmissionException.class).isPresent());
+               }
+       }
+
+       /**
         * Tests that the dispatcher takes part in the leader election.
         */
        @Test

Reply via email to