[FLINK-8454] [flip6] Remove JobExecutionResultCache from Dispatcher With the introduction of the ArchivedExecutionGraphStore to the Dispatcher, it is no longer necessary to store the JobResult separately. In order to decrease complexity and state duplication, this commit removes the JobExecutionResultCache and instead uses the ArchivedExecutionGraphStore to serve completed job information. A side effect of this change is that the JobExecutionResult is now available as long as the completed Flink job is stored in the ArchivedExecutionGraphStore.
This closes #5311. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6d7f2d7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6d7f2d7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6d7f2d7 Branch: refs/heads/master Commit: a6d7f2d72d47b268c0d6ffa402a59a6349c91d95 Parents: 8b817f0 Author: Till Rohrmann <[email protected]> Authored: Fri Jan 19 14:33:08 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Jan 26 13:50:20 2018 +0100 ---------------------------------------------------------------------- .../dispatcher/ArchivedExecutionGraphStore.java | 9 ++ .../flink/runtime/dispatcher/Dispatcher.java | 57 ++++-------- .../FileArchivedExecutionGraphStore.java | 6 ++ .../dispatcher/JobExecutionResultCache.java | 92 ------------------- .../JobExecutionResultGoneException.java | 36 -------- .../handler/job/JobExecutionResultHandler.java | 32 ++++--- .../runtime/webmonitor/RestfulGateway.java | 59 +++---------- .../runtime/dispatcher/DispatcherTest.java | 34 ++----- .../dispatcher/JobExecutionResultCacheTest.java | 93 -------------------- .../MemoryArchivedExecutionGraphStore.java | 12 +++ .../job/JobExecutionResultHandlerTest.java | 84 ++++++++---------- .../webmonitor/TestingRestfulGateway.java | 21 ++++- 12 files changed, 143 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java index 6f5df53..6e69833 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java @@ -71,4 +71,13 @@ public interface ArchivedExecutionGraphStore extends Closeable { * @return Collection of job details of all currently stored jobs */ Collection<JobDetails> getAvailableJobDetails(); + + /** + * Return the {@link JobDetails}} for the given job. + * + * @param jobId identifying the job for which to retrieve the {@link JobDetails} + * @return {@link JobDetails} of the requested job or null if the job is not available + */ + @Nullable + JobDetails getAvailableJobDetails(JobID jobId); } http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e930450..03eeaeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -38,12 +38,10 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; -import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; -import org.apache.flink.runtime.messages.JobExecutionResultGoneException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; @@ -62,7 +60,6 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.io.IOException; -import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -100,8 +97,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme private final LeaderElectionService leaderElectionService; - private final JobExecutionResultCache jobExecutionResultCache = new JobExecutionResultCache(); - private final ArchivedExecutionGraphStore archivedExecutionGraphStore; @Nullable @@ -344,7 +339,23 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme allJobDetails.addAll(completedJobDetails); return new MultipleJobsDetails(allJobDetails); }); + } + + @Override + public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) { + final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); + if (jobManagerRunner != null) { + return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); + } else { + final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); + + if (jobDetails != null) { + return CompletableFuture.completedFuture(jobDetails.getStatus()); + } else { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } + } } @Override @@ -387,36 +398,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } @Override - public CompletableFuture<JobResult> getJobExecutionResult( - final JobID jobId, - final Time timeout) { - - final SoftReference<JobResult> jobResultRef = jobExecutionResultCache.get(jobId); - if (jobResultRef == null) { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); - } else { - final JobResult jobResult = jobResultRef.get(); - if (jobResult == null) { - return FutureUtils.completedExceptionally(new JobExecutionResultGoneException(jobId)); - } else { - return CompletableFuture.completedFuture(jobResult); - } - } - } - - @Override - public CompletableFuture<Boolean> isJobExecutionResultPresent( - final JobID jobId, - final Time timeout) { - - final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId); - if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); - } - return CompletableFuture.completedFuture(jobExecutionResultPresent); - } - - @Override public CompletableFuture<String> triggerSavepoint( final JobID jobId, final String targetDirectory, @@ -515,10 +496,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme archivedExecutionGraph.getState().isGloballyTerminalState(), "Job " + archivedExecutionGraph.getJobID() + " is in state " + archivedExecutionGraph.getState() + " which is not globally terminal."); - final JobResult jobResult = JobResult.createFrom(archivedExecutionGraph); - - jobExecutionResultCache.put(jobResult); - final JobID jobId = archivedExecutionGraph.getJobID(); try { archivedExecutionGraphStore.put(archivedExecutionGraph); @@ -530,6 +507,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme e); } + final JobID jobId = archivedExecutionGraph.getJobID(); + try { removeJob(jobId, true); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java index 8db4fac..d2dbeb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java @@ -191,6 +191,12 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt return jobDetailsCache.asMap().values(); } + @Nullable + @Override + public JobDetails getAvailableJobDetails(JobID jobId) { + return jobDetailsCache.getIfPresent(jobId); + } + @Override public void close() throws IOException { cleanupFuture.cancel(false); http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java deleted file mode 100644 index 6d3dc55..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobmaster.JobResult; - -import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; - -import javax.annotation.Nullable; - -import java.lang.ref.SoftReference; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.util.Preconditions.checkState; - -/** - * Caches {@link JobResult}s by their job id. - * - * <p>Entries are cached for a finite time. However, the JobResults are wrapped in - * {@link SoftReference}s so that the GC can free them according to memory demand. - */ -class JobExecutionResultCache { - - private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; - - private final Cache<JobID, SoftReference<JobResult>> - jobExecutionResultCache = - CacheBuilder.newBuilder() - .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) - .build(); - - /** - * Adds a {@link JobResult} to the cache. - * - * @param result The entry to be added to the cache. - */ - public void put(final JobResult result) { - assertJobExecutionResultNotCached(result.getJobId()); - jobExecutionResultCache.put(result.getJobId(), new SoftReference<>(result)); - } - - /** - * Returns {@code true} if the cache contains a {@link JobResult} for the specified - * {@link JobID}. - * - * @param jobId The job id for which the presence of the {@link JobResult} should be tested. - * @return {@code true} if the cache contains an entry, {@code false} otherwise - */ - public boolean contains(final JobID jobId) { - return jobExecutionResultCache.getIfPresent(jobId) != null; - } - - /** - * Returns a {@link SoftReference} to the {@link JobResult} for the specified job, and removes - * the entry from the cache. - * - * @param jobId The job id of the {@link JobResult}. - * @return A {@link SoftReference} to the {@link JobResult} for the job, or {@code null} if the - * entry cannot be found in the cache. - */ - @Nullable - public SoftReference<JobResult> get(final JobID jobId) { - final SoftReference<JobResult> jobResultRef = jobExecutionResultCache.getIfPresent(jobId); - jobExecutionResultCache.invalidate(jobId); - return jobResultRef; - } - - private void assertJobExecutionResultNotCached(final JobID jobId) { - checkState( - jobExecutionResultCache.getIfPresent(jobId) == null, - "jobExecutionResultCache already contained entry for job %s", jobId); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java deleted file mode 100644 index d73b3a5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.messages; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.util.FlinkException; - -/** - * Exception indicating that the required {@link org.apache.flink.runtime.jobmaster.JobResult} was - * garbage collected. - * @see org.apache.flink.runtime.dispatcher.JobExecutionResultCache - */ -public class JobExecutionResultGoneException extends FlinkException { - - private static final long serialVersionUID = 1L; - - public JobExecutionResultGoneException(JobID jobId) { - super(String.format("Job execution result for job [%s] is gone.", jobId)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java index 5b6154c..9d2f953 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java @@ -20,8 +20,9 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; -import org.apache.flink.runtime.messages.JobExecutionResultGoneException; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -67,25 +68,32 @@ public class JobExecutionResultHandler @Nonnull final RestfulGateway gateway) throws RestHandlerException { final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - return gateway.isJobExecutionResultPresent(jobId, timeout).thenCompose(present -> { - if (!present) { + + final CompletableFuture<JobStatus> jobStatusFuture = gateway.requestJobStatus(jobId, timeout); + + return jobStatusFuture.thenCompose( + jobStatus -> { + if (jobStatus.isGloballyTerminalState()) { + return gateway + .requestJob(jobId, timeout) + .thenApply( + executionGraph -> { + final JobResult jobResult = JobResult.createFrom(executionGraph); + return JobExecutionResultResponseBody.created(jobResult); + }); + } else { return CompletableFuture.completedFuture( JobExecutionResultResponseBody.inProgress()); - } else { - return gateway.getJobExecutionResult(jobId, timeout) - .thenApply(JobExecutionResultResponseBody::created); } - } - ).exceptionally(throwable -> { - throw propagateException(throwable); - }); + }).exceptionally(throwable -> { + throw propagateException(throwable); + }); } private static CompletionException propagateException(final Throwable throwable) { final Throwable cause = ExceptionUtils.stripCompletionException(throwable); - if (cause instanceof JobExecutionResultGoneException - || cause instanceof FlinkJobNotFoundException) { + if (cause instanceof FlinkJobNotFoundException) { throw new CompletionException(new RestHandlerException( throwable.getMessage(), HttpResponseStatus.NOT_FOUND, http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index 4da3947..75c0546 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -24,9 +24,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; -import org.apache.flink.runtime.messages.JobExecutionResultGoneException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.dump.MetricQueryService; @@ -97,49 +96,6 @@ public interface RestfulGateway extends RpcGateway { CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); /** - * Returns the JobExecutionResult for a job, or in case the job failed, the failure cause. - * - * @param jobId ID of the job that we are interested in. - * @param timeout Timeout for the asynchronous operation. - * - * @see #isJobExecutionResultPresent(JobID, Time) - * - * @return CompletableFuture containing the JobExecutionResult. The future is completed - * exceptionally with: - * <ul> - * <li>{@link FlinkJobNotFoundException} if there is no result, or if the result has - * expired - * <li>{@link JobExecutionResultGoneException} if the result was removed due to memory demand. - * </ul> - */ - default CompletableFuture<JobResult> getJobExecutionResult( - JobID jobId, - @RpcTimeout Time timeout) { - throw new UnsupportedOperationException(); - } - - /** - * Tests if the {@link JobResult} is present. - * - * @param jobId ID of the job that we are interested in. - * @param timeout Timeout for the asynchronous operation. - * - * @see #getJobExecutionResult(JobID, Time) - * - * @return {@link CompletableFuture} containing {@code true} when then the - * {@link JobResult} is present. The future is completed exceptionally with: - * <ul> - * <li>{@link FlinkJobNotFoundException} if there is no job running with the specified ID, or - * if the result has expired - * <li>{@link JobExecutionResultGoneException} if the result was removed due to memory demand. - * </ul> - */ - default CompletableFuture<Boolean> isJobExecutionResultPresent( - JobID jobId, @RpcTimeout Time timeout) { - throw new UnsupportedOperationException(); - } - - /** * Triggers a savepoint with the given savepoint directory as a target. * * @param jobId ID of the job for which the savepoint should be triggered. @@ -154,4 +110,17 @@ public interface RestfulGateway extends RpcGateway { @RpcTimeout Time timeout) { throw new UnsupportedOperationException(); } + + /** + * Request the {@link JobStatus} of the given job. + * + * @param jobId identifying the job for which to retrieve the JobStatus + * @param timeout for the asynchronous operation + * @return A future to the {@link JobStatus} of the given job + */ + default CompletableFuture<JobStatus> requestJobStatus( + JobID jobId, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index c9b9bfb..14729df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -274,41 +274,21 @@ public class DispatcherTest extends TestLogger { final JobID failedJobId = new JobID(); onCompletionActions = dispatcher.new DispatcherOnCompleteActions(failedJobId); + final JobStatus expectedState = JobStatus.FAILED; final ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder() .setJobID(failedJobId) - .setState(JobStatus.FAILED) + .setState(expectedState) .setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L)) .build(); onCompletionActions.jobReachedGloballyTerminalState(failedExecutionGraph); assertThat( - dispatcherGateway.isJobExecutionResultPresent(failedJobId, TIMEOUT).get(), - equalTo(true)); + dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(), + equalTo(expectedState)); assertThat( - dispatcherGateway.getJobExecutionResult(failedJobId, TIMEOUT) - .get() - .isSuccess(), - equalTo(false)); - - final JobID successJobId = new JobID(); - onCompletionActions = dispatcher.new DispatcherOnCompleteActions(successJobId); - - final ArchivedExecutionGraph succeededExecutionGraph = new ArchivedExecutionGraphBuilder() - .setJobID(successJobId) - .setState(JobStatus.FINISHED) - .build(); - - onCompletionActions.jobReachedGloballyTerminalState(succeededExecutionGraph); - - assertThat( - dispatcherGateway.isJobExecutionResultPresent(successJobId, TIMEOUT).get(), - equalTo(true)); - assertThat( - dispatcherGateway.getJobExecutionResult(successJobId, TIMEOUT) - .get() - .isSuccess(), - equalTo(true)); + dispatcherGateway.requestJob(failedJobId, TIMEOUT).get(), + equalTo(failedExecutionGraph)); } @Test @@ -317,7 +297,7 @@ public class DispatcherTest extends TestLogger { final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); try { - dispatcherGateway.getJobExecutionResult(new JobID(), TIMEOUT).get(); + dispatcherGateway.requestJob(new JobID(), TIMEOUT).get(); } catch (ExecutionException e) { final Throwable throwable = ExceptionUtils.stripExecutionException(e); assertThat(throwable, instanceOf(FlinkJobNotFoundException.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java deleted file mode 100644 index dfc059c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobmaster.JobResult; -import org.apache.flink.testutils.category.Flip6; -import org.apache.flink.util.TestLogger; - -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.lang.ref.SoftReference; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * Tests for {@link JobExecutionResultCache}. - */ -@Category(Flip6.class) -public class JobExecutionResultCacheTest extends TestLogger { - - private JobExecutionResultCache jobExecutionResultCache; - - @Before - public void setUp() { - jobExecutionResultCache = new JobExecutionResultCache(); - } - - @Test - public void testCacheResultUntilRetrieved() { - final JobID jobId = new JobID(); - final JobResult jobResult = new JobResult.Builder() - .jobId(jobId) - .netRuntime(Long.MAX_VALUE) - .build(); - jobExecutionResultCache.put(jobResult); - - assertThat(jobExecutionResultCache.contains(jobId), equalTo(true)); - - SoftReference<JobResult> jobResultRef; - jobResultRef = jobExecutionResultCache.get(jobId); - - assertThat(jobResultRef, notNullValue()); - assertThat(jobResultRef.get(), sameInstance(jobResult)); - - assertThat(jobExecutionResultCache.contains(jobId), equalTo(false)); - - jobResultRef = jobExecutionResultCache.get(jobId); - assertThat(jobResultRef, nullValue()); - } - - @Test - public void testThrowExceptionIfEntryAlreadyExists() { - final JobID jobId = new JobID(); - final JobResult build = new JobResult.Builder() - .jobId(jobId) - .netRuntime(Long.MAX_VALUE) - .build(); - jobExecutionResultCache.put(build); - - try { - jobExecutionResultCache.put(build); - fail("Expected exception not thrown."); - } catch (final IllegalStateException e) { - assertThat(e.getMessage(), containsString("already contained entry for job")); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java index 9bfdbb3..bcb7df2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java @@ -73,6 +73,18 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph .collect(Collectors.toList()); } + @Nullable + @Override + public JobDetails getAvailableJobDetails(JobID jobId) { + final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId); + + if (archivedExecutionGraph != null) { + return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph); + } else { + return null; + } + } + @Override public void close() throws IOException { serializableExecutionGraphs.clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java index b10f973..0861089 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java @@ -21,17 +21,17 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; -import org.apache.flink.runtime.messages.JobExecutionResultGoneException; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; import org.apache.flink.runtime.rest.messages.queue.QueueStatus; -import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -39,12 +39,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import java.util.Collections; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.equalTo; @@ -53,8 +50,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; /** * Tests for {@link JobExecutionResultHandler}. @@ -65,23 +60,15 @@ public class JobExecutionResultHandlerTest extends TestLogger { private JobExecutionResultHandler jobExecutionResultHandler; - @Mock - private RestfulGateway mockRestfulGateway; - private HandlerRequest<EmptyRequestBody, JobMessageParameters> testRequest; @Before public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); + final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder().build(); jobExecutionResultHandler = new JobExecutionResultHandler( CompletableFuture.completedFuture("localhost:12345"), - new GatewayRetriever<RestfulGateway>() { - @Override - public CompletableFuture<RestfulGateway> getFuture() { - return CompletableFuture.completedFuture(mockRestfulGateway); - } - }, + () -> CompletableFuture.completedFuture(testingRestfulGateway), Time.seconds(10), Collections.emptyMap()); @@ -94,12 +81,14 @@ public class JobExecutionResultHandlerTest extends TestLogger { @Test public void testResultInProgress() throws Exception { - when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(false)); + final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder() + .setRequestJobStatusFunction( + jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING)) + .build(); final JobExecutionResultResponseBody responseBody = jobExecutionResultHandler.handleRequest( testRequest, - mockRestfulGateway).get(); + testingRestfulGateway).get(); assertThat( responseBody.getStatus().getId(), @@ -108,18 +97,29 @@ public class JobExecutionResultHandlerTest extends TestLogger { @Test public void testCompletedResult() throws Exception { - when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(true)); - - when(mockRestfulGateway.getJobExecutionResult(any(JobID.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(new JobResult.Builder() - .jobId(TEST_JOB_ID) - .netRuntime(Long.MAX_VALUE) - .build())); + final JobStatus jobStatus = JobStatus.FINISHED; + final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder() + .setJobID(TEST_JOB_ID) + .setState(jobStatus) + .build(); + + final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder() + .setRequestJobStatusFunction( + jobId -> { + assertThat(jobId, equalTo(TEST_JOB_ID)); + return CompletableFuture.completedFuture(jobStatus); + }) + .setRequestJobFunction( + jobId -> { + assertThat(jobId, equalTo(TEST_JOB_ID)); + return CompletableFuture.completedFuture(executionGraph); + } + ) + .build(); final JobExecutionResultResponseBody responseBody = jobExecutionResultHandler.handleRequest( testRequest, - mockRestfulGateway).get(); + testingRestfulGateway).get(); assertThat( responseBody.getStatus().getId(), @@ -129,25 +129,16 @@ public class JobExecutionResultHandlerTest extends TestLogger { @Test public void testPropagateFlinkJobNotFoundExceptionAsRestHandlerException() throws Exception { - assertPropagateAsRestHandlerException( - new CompletionException(new FlinkJobNotFoundException(new JobID()))); - } - - @Test - public void testPropagateJobExecutionResultGoneExceptionAsRestHandlerException() throws Exception { - assertPropagateAsRestHandlerException( - new CompletionException(new JobExecutionResultGoneException(new JobID()))); - } - - private void assertPropagateAsRestHandlerException(final Exception exception) throws Exception { - when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), any(Time.class))) - .thenReturn(FutureUtils.completedExceptionally( - exception)); + final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder() + .setRequestJobStatusFunction( + jobId -> FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)) + ) + .build(); try { jobExecutionResultHandler.handleRequest( testRequest, - mockRestfulGateway).get(); + testingRestfulGateway).get(); fail("Expected exception not thrown"); } catch (final ExecutionException e) { final Throwable cause = ExceptionUtils.stripCompletionException(e.getCause()); @@ -157,5 +148,4 @@ public class JobExecutionResultHandlerTest extends TestLogger { equalTo(HttpResponseStatus.NOT_FOUND)); } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index 1e27d8e..1e86051 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -39,6 +40,7 @@ import java.util.function.Supplier; public class TestingRestfulGateway implements RestfulGateway { static final Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> DEFAULT_REQUEST_JOB_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); + static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); static final Supplier<CompletableFuture<MultipleJobsDetails>> DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER = () -> CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())); static final Supplier<CompletableFuture<ClusterOverview>> DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER = () -> CompletableFuture.completedFuture(new ClusterOverview(0, 0, 0, 0, 0, 0, 0)); static final Supplier<CompletableFuture<Collection<String>>> DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> CompletableFuture.completedFuture(Collections.emptyList()); @@ -53,6 +55,8 @@ public class TestingRestfulGateway implements RestfulGateway { protected Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction; + protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction; + protected Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier; protected Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier; @@ -67,6 +71,7 @@ public class TestingRestfulGateway implements RestfulGateway { LOCALHOST, LOCALHOST, DEFAULT_REQUEST_JOB_FUNCTION, + DEFAULT_REQUEST_JOB_STATUS_FUNCTION, DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER, DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER, DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER, @@ -78,6 +83,7 @@ public class TestingRestfulGateway implements RestfulGateway { String hostname, String restAddress, Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction, + Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction, Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier, Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier, Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier, @@ -86,6 +92,7 @@ public class TestingRestfulGateway implements RestfulGateway { this.hostname = hostname; this.restAddress = restAddress; this.requestJobFunction = requestJobFunction; + this.requestJobStatusFunction = requestJobStatusFunction; this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier; this.requestClusterOverviewSupplier = requestClusterOverviewSupplier; this.requestMetricQueryServicePathsSupplier = requestMetricQueryServicePathsSupplier; @@ -103,6 +110,11 @@ public class TestingRestfulGateway implements RestfulGateway { } @Override + public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) { + return requestJobStatusFunction.apply(jobId); + } + + @Override public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) { return requestMultipleJobDetailsSupplier.get(); } @@ -144,6 +156,7 @@ public class TestingRestfulGateway implements RestfulGateway { private String hostname = LOCALHOST; private String restAddress = LOCALHOST; private Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction; + private Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction; private Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier; private Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier; private Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier; @@ -151,6 +164,7 @@ public class TestingRestfulGateway implements RestfulGateway { public Builder() { requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION; + requestJobStatusFunction = DEFAULT_REQUEST_JOB_STATUS_FUNCTION; requestMultipleJobDetailsSupplier = DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER; requestClusterOverviewSupplier = DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER; requestMetricQueryServicePathsSupplier = DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER; @@ -177,6 +191,11 @@ public class TestingRestfulGateway implements RestfulGateway { return this; } + public Builder setRequestJobStatusFunction(Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction) { + this.requestJobStatusFunction = requestJobStatusFunction; + return this; + } + public Builder setRequestMultipleJobDetailsSupplier(Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier) { this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier; return this; @@ -198,7 +217,7 @@ public class TestingRestfulGateway implements RestfulGateway { } public TestingRestfulGateway build() { - return new TestingRestfulGateway(address, hostname, restAddress, requestJobFunction, requestMultipleJobDetailsSupplier, requestClusterOverviewSupplier, requestMetricQueryServicePathsSupplier, requestTaskManagerMetricQueryServicePathsSupplier); + return new TestingRestfulGateway(address, hostname, restAddress, requestJobFunction, requestJobStatusFunction, requestMultipleJobDetailsSupplier, requestClusterOverviewSupplier, requestMetricQueryServicePathsSupplier, requestTaskManagerMetricQueryServicePathsSupplier); } } }
