[FLINK-8454] [flip6] Remove JobExecutionResultCache from Dispatcher

With the introduction of the ArchivedExecutionGraphStore to the Dispatcher,
it is no longer necessary to store the JobResult separately. In order to
decrease complexity and state duplication, this commit removes the
JobExecutionResultCache and instead uses the ArchivedExecutionGraphStore
to serve completed job information. A side effect of this change is that the
JobExecutionResult is now available as long as the completed Flink job is stored
in the ArchivedExecutionGraphStore.

This closes #5311.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6d7f2d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6d7f2d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6d7f2d7

Branch: refs/heads/master
Commit: a6d7f2d72d47b268c0d6ffa402a59a6349c91d95
Parents: 8b817f0
Author: Till Rohrmann <[email protected]>
Authored: Fri Jan 19 14:33:08 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 13:50:20 2018 +0100

----------------------------------------------------------------------
 .../dispatcher/ArchivedExecutionGraphStore.java |  9 ++
 .../flink/runtime/dispatcher/Dispatcher.java    | 57 ++++--------
 .../FileArchivedExecutionGraphStore.java        |  6 ++
 .../dispatcher/JobExecutionResultCache.java     | 92 -------------------
 .../JobExecutionResultGoneException.java        | 36 --------
 .../handler/job/JobExecutionResultHandler.java  | 32 ++++---
 .../runtime/webmonitor/RestfulGateway.java      | 59 +++----------
 .../runtime/dispatcher/DispatcherTest.java      | 34 ++-----
 .../dispatcher/JobExecutionResultCacheTest.java | 93 --------------------
 .../MemoryArchivedExecutionGraphStore.java      | 12 +++
 .../job/JobExecutionResultHandlerTest.java      | 84 ++++++++----------
 .../webmonitor/TestingRestfulGateway.java       | 21 ++++-
 12 files changed, 143 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
index 6f5df53..6e69833 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
@@ -71,4 +71,13 @@ public interface ArchivedExecutionGraphStore extends 
Closeable {
         * @return Collection of job details of all currently stored jobs
         */
        Collection<JobDetails> getAvailableJobDetails();
+
+       /**
+        * Return the {@link JobDetails}} for the given job.
+        *
+        * @param jobId identifying the job for which to retrieve the {@link 
JobDetails}
+        * @return {@link JobDetails} of the requested job or null if the job 
is not available
+        */
+       @Nullable
+       JobDetails getAvailableJobDetails(JobID jobId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e930450..03eeaeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,12 +38,10 @@ import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
@@ -62,7 +60,6 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -100,8 +97,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        private final LeaderElectionService leaderElectionService;
 
-       private final JobExecutionResultCache jobExecutionResultCache = new 
JobExecutionResultCache();
-
        private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
        @Nullable
@@ -344,7 +339,23 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                allJobDetails.addAll(completedJobDetails);
                                return new MultipleJobsDetails(allJobDetails);
                        });
+       }
+
+       @Override
+       public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time 
timeout) {
+               final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
+               if (jobManagerRunner != null) {
+                       return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
+               } else {
+                       final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+
+                       if (jobDetails != null) {
+                               return 
CompletableFuture.completedFuture(jobDetails.getStatus());
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+                       }
+               }
        }
 
        @Override
@@ -387,36 +398,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
-       public CompletableFuture<JobResult> getJobExecutionResult(
-                       final JobID jobId,
-                       final Time timeout) {
-
-               final SoftReference<JobResult> jobResultRef = 
jobExecutionResultCache.get(jobId);
-               if (jobResultRef == null) {
-                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
-               } else {
-                       final JobResult jobResult = jobResultRef.get();
-                       if (jobResult == null) {
-                               return FutureUtils.completedExceptionally(new 
JobExecutionResultGoneException(jobId));
-                       } else {
-                               return 
CompletableFuture.completedFuture(jobResult);
-                       }
-               }
-       }
-
-       @Override
-       public CompletableFuture<Boolean> isJobExecutionResultPresent(
-                       final JobID jobId,
-                       final Time timeout) {
-
-               final boolean jobExecutionResultPresent = 
jobExecutionResultCache.contains(jobId);
-               if (!jobManagerRunners.containsKey(jobId) && 
!jobExecutionResultPresent) {
-                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
-               }
-               return 
CompletableFuture.completedFuture(jobExecutionResultPresent);
-       }
-
-       @Override
        public CompletableFuture<String> triggerSavepoint(
                        final JobID jobId,
                        final String targetDirectory,
@@ -515,10 +496,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        
archivedExecutionGraph.getState().isGloballyTerminalState(),
                        "Job " + archivedExecutionGraph.getJobID() + " is in 
state " +
                                archivedExecutionGraph.getState() + " which is 
not globally terminal.");
-               final JobResult jobResult = 
JobResult.createFrom(archivedExecutionGraph);
-
-               jobExecutionResultCache.put(jobResult);
-               final JobID jobId = archivedExecutionGraph.getJobID();
 
                try {
                        archivedExecutionGraphStore.put(archivedExecutionGraph);
@@ -530,6 +507,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                e);
                }
 
+               final JobID jobId = archivedExecutionGraph.getJobID();
+
                try {
                        removeJob(jobId, true);
                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
index 8db4fac..d2dbeb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
@@ -191,6 +191,12 @@ public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphSt
                return jobDetailsCache.asMap().values();
        }
 
+       @Nullable
+       @Override
+       public JobDetails getAvailableJobDetails(JobID jobId) {
+               return jobDetailsCache.getIfPresent(jobId);
+       }
+
        @Override
        public void close() throws IOException {
                cleanupFuture.cancel(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
deleted file mode 100644
index 6d3dc55..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmaster.JobResult;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-
-import javax.annotation.Nullable;
-
-import java.lang.ref.SoftReference;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * Caches {@link JobResult}s by their job id.
- *
- * <p>Entries are cached for a finite time. However, the JobResults are 
wrapped in
- * {@link SoftReference}s so that the GC can free them according to memory 
demand.
- */
-class JobExecutionResultCache {
-
-       private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
-
-       private final Cache<JobID, SoftReference<JobResult>>
-               jobExecutionResultCache =
-               CacheBuilder.newBuilder()
-                       .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
-                       .build();
-
-       /**
-        * Adds a {@link JobResult} to the cache.
-        *
-        * @param result The entry to be added to the cache.
-        */
-       public void put(final JobResult result) {
-               assertJobExecutionResultNotCached(result.getJobId());
-               jobExecutionResultCache.put(result.getJobId(), new 
SoftReference<>(result));
-       }
-
-       /**
-        * Returns {@code true} if the cache contains a {@link JobResult} for 
the specified
-        * {@link JobID}.
-        *
-        * @param jobId The job id for which the presence of the {@link 
JobResult} should be tested.
-        * @return {@code true} if the cache contains an entry, {@code false} 
otherwise
-        */
-       public boolean contains(final JobID jobId) {
-               return jobExecutionResultCache.getIfPresent(jobId) != null;
-       }
-
-       /**
-        * Returns a {@link SoftReference} to the {@link JobResult} for the 
specified job, and removes
-        * the entry from the cache.
-        *
-        * @param jobId The job id of the {@link JobResult}.
-        * @return A {@link SoftReference} to the {@link JobResult} for the 
job, or {@code null} if the
-        * entry cannot be found in the cache.
-        */
-       @Nullable
-       public SoftReference<JobResult> get(final JobID jobId) {
-               final SoftReference<JobResult> jobResultRef = 
jobExecutionResultCache.getIfPresent(jobId);
-               jobExecutionResultCache.invalidate(jobId);
-               return jobResultRef;
-       }
-
-       private void assertJobExecutionResultNotCached(final JobID jobId) {
-               checkState(
-                       jobExecutionResultCache.getIfPresent(jobId) == null,
-                       "jobExecutionResultCache already contained entry for 
job %s", jobId);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
deleted file mode 100644
index d73b3a5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.FlinkException;
-
-/**
- * Exception indicating that the required {@link 
org.apache.flink.runtime.jobmaster.JobResult} was
- * garbage collected.
- * @see org.apache.flink.runtime.dispatcher.JobExecutionResultCache
- */
-public class JobExecutionResultGoneException extends FlinkException {
-
-       private static final long serialVersionUID = 1L;
-
-       public JobExecutionResultGoneException(JobID jobId) {
-               super(String.format("Job execution result for job [%s] is 
gone.", jobId));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
index 5b6154c..9d2f953 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
@@ -20,8 +20,9 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -67,25 +68,32 @@ public class JobExecutionResultHandler
                        @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
 
                final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-               return gateway.isJobExecutionResultPresent(jobId, 
timeout).thenCompose(present -> {
-                               if (!present) {
+
+               final CompletableFuture<JobStatus> jobStatusFuture = 
gateway.requestJobStatus(jobId, timeout);
+
+               return jobStatusFuture.thenCompose(
+                       jobStatus -> {
+                               if (jobStatus.isGloballyTerminalState()) {
+                                       return gateway
+                                               .requestJob(jobId, timeout)
+                                               .thenApply(
+                                                       executionGraph -> {
+                                                               final JobResult 
jobResult = JobResult.createFrom(executionGraph);
+                                                               return 
JobExecutionResultResponseBody.created(jobResult);
+                                                       });
+                               } else {
                                        return 
CompletableFuture.completedFuture(
                                                
JobExecutionResultResponseBody.inProgress());
-                               } else {
-                                       return 
gateway.getJobExecutionResult(jobId, timeout)
-                                               
.thenApply(JobExecutionResultResponseBody::created);
                                }
-                       }
-               ).exceptionally(throwable -> {
-                       throw propagateException(throwable);
-               });
+                       }).exceptionally(throwable -> {
+                               throw propagateException(throwable);
+                       });
        }
 
        private static CompletionException propagateException(final Throwable 
throwable) {
                final Throwable cause = 
ExceptionUtils.stripCompletionException(throwable);
 
-               if (cause instanceof JobExecutionResultGoneException
-                       || cause instanceof FlinkJobNotFoundException) {
+               if (cause instanceof FlinkJobNotFoundException) {
                        throw new CompletionException(new RestHandlerException(
                                throwable.getMessage(),
                                HttpResponseStatus.NOT_FOUND,

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 4da3947..75c0546 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -24,9 +24,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
@@ -97,49 +96,6 @@ public interface RestfulGateway extends RpcGateway {
        CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 
        /**
-        * Returns the JobExecutionResult for a job, or in case the job failed, 
the failure cause.
-        *
-        * @param jobId ID of the job that we are interested in.
-        * @param timeout Timeout for the asynchronous operation.
-        *
-        * @see #isJobExecutionResultPresent(JobID, Time)
-        *
-        * @return CompletableFuture containing the JobExecutionResult. The 
future is completed
-        * exceptionally with:
-        * <ul>
-        *      <li>{@link FlinkJobNotFoundException} if there is no result, or 
if the result has
-        *      expired
-        *      <li>{@link JobExecutionResultGoneException} if the result was 
removed due to memory demand.
-        * </ul>
-        */
-       default CompletableFuture<JobResult> getJobExecutionResult(
-                       JobID jobId,
-                       @RpcTimeout Time timeout) {
-               throw new UnsupportedOperationException();
-       }
-
-       /**
-        * Tests if the {@link JobResult} is present.
-        *
-        * @param jobId ID of the job that we are interested in.
-        * @param timeout Timeout for the asynchronous operation.
-        *
-        * @see #getJobExecutionResult(JobID, Time)
-        *
-        * @return {@link CompletableFuture} containing {@code true} when then 
the
-        * {@link JobResult} is present. The future is completed exceptionally 
with:
-        * <ul>
-        *      <li>{@link FlinkJobNotFoundException} if there is no job 
running with the specified ID, or
-        *      if the result has expired
-        *      <li>{@link JobExecutionResultGoneException} if the result was 
removed due to memory demand.
-        * </ul>
-        */
-       default CompletableFuture<Boolean> isJobExecutionResultPresent(
-                       JobID jobId, @RpcTimeout Time timeout) {
-               throw new UnsupportedOperationException();
-       }
-
-       /**
         * Triggers a savepoint with the given savepoint directory as a target.
         *
         * @param jobId           ID of the job for which the savepoint should 
be triggered.
@@ -154,4 +110,17 @@ public interface RestfulGateway extends RpcGateway {
                        @RpcTimeout Time timeout) {
                throw new UnsupportedOperationException();
        }
+
+       /**
+        * Request the {@link JobStatus} of the given job.
+        *
+        * @param jobId identifying the job for which to retrieve the JobStatus
+        * @param timeout for the asynchronous operation
+        * @return A future to the {@link JobStatus} of the given job
+        */
+       default CompletableFuture<JobStatus> requestJobStatus(
+                       JobID jobId,
+                       @RpcTimeout Time timeout) {
+               throw new UnsupportedOperationException();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index c9b9bfb..14729df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -274,41 +274,21 @@ public class DispatcherTest extends TestLogger {
                final JobID failedJobId = new JobID();
                onCompletionActions = dispatcher.new 
DispatcherOnCompleteActions(failedJobId);
 
+               final JobStatus expectedState = JobStatus.FAILED;
                final ArchivedExecutionGraph failedExecutionGraph = new 
ArchivedExecutionGraphBuilder()
                        .setJobID(failedJobId)
-                       .setState(JobStatus.FAILED)
+                       .setState(expectedState)
                        .setFailureCause(new ErrorInfo(new 
RuntimeException("expected"), 1L))
                        .build();
 
                
onCompletionActions.jobReachedGloballyTerminalState(failedExecutionGraph);
 
                assertThat(
-                       
dispatcherGateway.isJobExecutionResultPresent(failedJobId, TIMEOUT).get(),
-                       equalTo(true));
+                       dispatcherGateway.requestJobStatus(failedJobId, 
TIMEOUT).get(),
+                       equalTo(expectedState));
                assertThat(
-                       dispatcherGateway.getJobExecutionResult(failedJobId, 
TIMEOUT)
-                               .get()
-                               .isSuccess(),
-                       equalTo(false));
-
-               final JobID successJobId = new JobID();
-               onCompletionActions = dispatcher.new 
DispatcherOnCompleteActions(successJobId);
-
-               final ArchivedExecutionGraph succeededExecutionGraph = new 
ArchivedExecutionGraphBuilder()
-                       .setJobID(successJobId)
-                       .setState(JobStatus.FINISHED)
-                       .build();
-
-               
onCompletionActions.jobReachedGloballyTerminalState(succeededExecutionGraph);
-
-               assertThat(
-                       
dispatcherGateway.isJobExecutionResultPresent(successJobId, TIMEOUT).get(),
-                       equalTo(true));
-               assertThat(
-                       dispatcherGateway.getJobExecutionResult(successJobId, 
TIMEOUT)
-                               .get()
-                               .isSuccess(),
-                       equalTo(true));
+                       dispatcherGateway.requestJob(failedJobId, 
TIMEOUT).get(),
+                       equalTo(failedExecutionGraph));
        }
 
        @Test
@@ -317,7 +297,7 @@ public class DispatcherTest extends TestLogger {
 
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
                try {
-                       dispatcherGateway.getJobExecutionResult(new JobID(), 
TIMEOUT).get();
+                       dispatcherGateway.requestJob(new JobID(), 
TIMEOUT).get();
                } catch (ExecutionException e) {
                        final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
                        assertThat(throwable, 
instanceOf(FlinkJobNotFoundException.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
deleted file mode 100644
index dfc059c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.testutils.category.Flip6;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.lang.ref.SoftReference;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link JobExecutionResultCache}.
- */
-@Category(Flip6.class)
-public class JobExecutionResultCacheTest extends TestLogger {
-
-       private JobExecutionResultCache jobExecutionResultCache;
-
-       @Before
-       public void setUp() {
-               jobExecutionResultCache = new JobExecutionResultCache();
-       }
-
-       @Test
-       public void testCacheResultUntilRetrieved() {
-               final JobID jobId = new JobID();
-               final JobResult jobResult = new JobResult.Builder()
-                       .jobId(jobId)
-                       .netRuntime(Long.MAX_VALUE)
-                       .build();
-               jobExecutionResultCache.put(jobResult);
-
-               assertThat(jobExecutionResultCache.contains(jobId), 
equalTo(true));
-
-               SoftReference<JobResult> jobResultRef;
-               jobResultRef = jobExecutionResultCache.get(jobId);
-
-               assertThat(jobResultRef, notNullValue());
-               assertThat(jobResultRef.get(), sameInstance(jobResult));
-
-               assertThat(jobExecutionResultCache.contains(jobId), 
equalTo(false));
-
-               jobResultRef = jobExecutionResultCache.get(jobId);
-               assertThat(jobResultRef, nullValue());
-       }
-
-       @Test
-       public void testThrowExceptionIfEntryAlreadyExists() {
-               final JobID jobId = new JobID();
-               final JobResult build = new JobResult.Builder()
-                       .jobId(jobId)
-                       .netRuntime(Long.MAX_VALUE)
-                       .build();
-               jobExecutionResultCache.put(build);
-
-               try {
-                       jobExecutionResultCache.put(build);
-                       fail("Expected exception not thrown.");
-               } catch (final IllegalStateException e) {
-                       assertThat(e.getMessage(), containsString("already 
contained entry for job"));
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
index 9bfdbb3..bcb7df2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
@@ -73,6 +73,18 @@ public class MemoryArchivedExecutionGraphStore implements 
ArchivedExecutionGraph
                        .collect(Collectors.toList());
        }
 
+       @Nullable
+       @Override
+       public JobDetails getAvailableJobDetails(JobID jobId) {
+               final ArchivedExecutionGraph archivedExecutionGraph = 
serializableExecutionGraphs.get(jobId);
+
+               if (archivedExecutionGraph != null) {
+                       return 
WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
+               } else {
+                       return null;
+               }
+       }
+
        @Override
        public void close() throws IOException {
                serializableExecutionGraphs.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
index b10f973..0861089 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
@@ -21,17 +21,17 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -39,12 +39,9 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -53,8 +50,6 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link JobExecutionResultHandler}.
@@ -65,23 +60,15 @@ public class JobExecutionResultHandlerTest extends 
TestLogger {
 
        private JobExecutionResultHandler jobExecutionResultHandler;
 
-       @Mock
-       private RestfulGateway mockRestfulGateway;
-
        private HandlerRequest<EmptyRequestBody, JobMessageParameters> 
testRequest;
 
        @Before
        public void setUp() throws Exception {
-               MockitoAnnotations.initMocks(this);
+               final TestingRestfulGateway testingRestfulGateway = 
TestingRestfulGateway.newBuilder().build();
 
                jobExecutionResultHandler = new JobExecutionResultHandler(
                        CompletableFuture.completedFuture("localhost:12345"),
-                       new GatewayRetriever<RestfulGateway>() {
-                               @Override
-                               public CompletableFuture<RestfulGateway> 
getFuture() {
-                                       return 
CompletableFuture.completedFuture(mockRestfulGateway);
-                               }
-                       },
+                       () -> 
CompletableFuture.completedFuture(testingRestfulGateway),
                        Time.seconds(10),
                        Collections.emptyMap());
 
@@ -94,12 +81,14 @@ public class JobExecutionResultHandlerTest extends 
TestLogger {
 
        @Test
        public void testResultInProgress() throws Exception {
-               
when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), 
any(Time.class)))
-                       .thenReturn(CompletableFuture.completedFuture(false));
+               final TestingRestfulGateway testingRestfulGateway = 
TestingRestfulGateway.newBuilder()
+                       .setRequestJobStatusFunction(
+                               jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING))
+                       .build();
 
                final JobExecutionResultResponseBody responseBody = 
jobExecutionResultHandler.handleRequest(
                        testRequest,
-                       mockRestfulGateway).get();
+                       testingRestfulGateway).get();
 
                assertThat(
                        responseBody.getStatus().getId(),
@@ -108,18 +97,29 @@ public class JobExecutionResultHandlerTest extends 
TestLogger {
 
        @Test
        public void testCompletedResult() throws Exception {
-               
when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), 
any(Time.class)))
-                       .thenReturn(CompletableFuture.completedFuture(true));
-
-               when(mockRestfulGateway.getJobExecutionResult(any(JobID.class), 
any(Time.class)))
-                       .thenReturn(CompletableFuture.completedFuture(new 
JobResult.Builder()
-                               .jobId(TEST_JOB_ID)
-                               .netRuntime(Long.MAX_VALUE)
-                               .build()));
+               final JobStatus jobStatus = JobStatus.FINISHED;
+               final ArchivedExecutionGraph executionGraph = new 
ArchivedExecutionGraphBuilder()
+                       .setJobID(TEST_JOB_ID)
+                       .setState(jobStatus)
+                       .build();
+
+               final TestingRestfulGateway testingRestfulGateway = 
TestingRestfulGateway.newBuilder()
+                       .setRequestJobStatusFunction(
+                               jobId -> {
+                                       assertThat(jobId, equalTo(TEST_JOB_ID));
+                                       return 
CompletableFuture.completedFuture(jobStatus);
+                               })
+                       .setRequestJobFunction(
+                               jobId -> {
+                                       assertThat(jobId, equalTo(TEST_JOB_ID));
+                                       return 
CompletableFuture.completedFuture(executionGraph);
+                               }
+                       )
+                       .build();
 
                final JobExecutionResultResponseBody responseBody = 
jobExecutionResultHandler.handleRequest(
                        testRequest,
-                       mockRestfulGateway).get();
+                       testingRestfulGateway).get();
 
                assertThat(
                        responseBody.getStatus().getId(),
@@ -129,25 +129,16 @@ public class JobExecutionResultHandlerTest extends 
TestLogger {
 
        @Test
        public void 
testPropagateFlinkJobNotFoundExceptionAsRestHandlerException() throws Exception 
{
-               assertPropagateAsRestHandlerException(
-                       new CompletionException(new 
FlinkJobNotFoundException(new JobID())));
-       }
-
-       @Test
-       public void 
testPropagateJobExecutionResultGoneExceptionAsRestHandlerException() throws 
Exception {
-               assertPropagateAsRestHandlerException(
-                       new CompletionException(new 
JobExecutionResultGoneException(new JobID())));
-       }
-
-       private void assertPropagateAsRestHandlerException(final Exception 
exception) throws Exception {
-               
when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), 
any(Time.class)))
-                       .thenReturn(FutureUtils.completedExceptionally(
-                               exception));
+               final TestingRestfulGateway testingRestfulGateway = 
TestingRestfulGateway.newBuilder()
+                       .setRequestJobStatusFunction(
+                               jobId -> FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId))
+                       )
+                       .build();
 
                try {
                        jobExecutionResultHandler.handleRequest(
                                testRequest,
-                               mockRestfulGateway).get();
+                               testingRestfulGateway).get();
                        fail("Expected exception not thrown");
                } catch (final ExecutionException e) {
                        final Throwable cause = 
ExceptionUtils.stripCompletionException(e.getCause());
@@ -157,5 +148,4 @@ public class JobExecutionResultHandlerTest extends 
TestLogger {
                                equalTo(HttpResponseStatus.NOT_FOUND));
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6d7f2d7/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 1e27d8e..1e86051 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 
@@ -39,6 +40,7 @@ import java.util.function.Supplier;
 public class TestingRestfulGateway implements RestfulGateway {
 
        static final Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> DEFAULT_REQUEST_JOB_FUNCTION = jobId -> 
FutureUtils.completedExceptionally(new UnsupportedOperationException());
+       static final Function<JobID, CompletableFuture<JobStatus>> 
DEFAULT_REQUEST_JOB_STATUS_FUNCTION = jobId -> 
FutureUtils.completedExceptionally(new UnsupportedOperationException());
        static final Supplier<CompletableFuture<MultipleJobsDetails>> 
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER = () -> 
CompletableFuture.completedFuture(new 
MultipleJobsDetails(Collections.emptyList()));
        static final Supplier<CompletableFuture<ClusterOverview>> 
DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER = () -> 
CompletableFuture.completedFuture(new ClusterOverview(0, 0, 0, 0, 0, 0, 0));
        static final Supplier<CompletableFuture<Collection<String>>> 
DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> 
CompletableFuture.completedFuture(Collections.emptyList());
@@ -53,6 +55,8 @@ public class TestingRestfulGateway implements RestfulGateway {
 
        protected Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> requestJobFunction;
 
+       protected Function<JobID, CompletableFuture<JobStatus>> 
requestJobStatusFunction;
+
        protected Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier;
 
        protected Supplier<CompletableFuture<ClusterOverview>> 
requestClusterOverviewSupplier;
@@ -67,6 +71,7 @@ public class TestingRestfulGateway implements RestfulGateway {
                        LOCALHOST,
                        LOCALHOST,
                        DEFAULT_REQUEST_JOB_FUNCTION,
+                       DEFAULT_REQUEST_JOB_STATUS_FUNCTION,
                        DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER,
                        DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER,
                        DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER,
@@ -78,6 +83,7 @@ public class TestingRestfulGateway implements RestfulGateway {
                        String hostname,
                        String restAddress,
                        Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> requestJobFunction,
+                       Function<JobID, CompletableFuture<JobStatus>> 
requestJobStatusFunction,
                        Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier,
                        Supplier<CompletableFuture<ClusterOverview>> 
requestClusterOverviewSupplier,
                        Supplier<CompletableFuture<Collection<String>>> 
requestMetricQueryServicePathsSupplier,
@@ -86,6 +92,7 @@ public class TestingRestfulGateway implements RestfulGateway {
                this.hostname = hostname;
                this.restAddress = restAddress;
                this.requestJobFunction = requestJobFunction;
+               this.requestJobStatusFunction = requestJobStatusFunction;
                this.requestMultipleJobDetailsSupplier = 
requestMultipleJobDetailsSupplier;
                this.requestClusterOverviewSupplier = 
requestClusterOverviewSupplier;
                this.requestMetricQueryServicePathsSupplier = 
requestMetricQueryServicePathsSupplier;
@@ -103,6 +110,11 @@ public class TestingRestfulGateway implements 
RestfulGateway {
        }
 
        @Override
+       public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time 
timeout) {
+               return requestJobStatusFunction.apply(jobId);
+       }
+
+       @Override
        public CompletableFuture<MultipleJobsDetails> 
requestMultipleJobDetails(Time timeout) {
                return requestMultipleJobDetailsSupplier.get();
        }
@@ -144,6 +156,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                private String hostname = LOCALHOST;
                private String restAddress = LOCALHOST;
                private Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> requestJobFunction;
+               private Function<JobID, CompletableFuture<JobStatus>> 
requestJobStatusFunction;
                private Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier;
                private Supplier<CompletableFuture<ClusterOverview>> 
requestClusterOverviewSupplier;
                private Supplier<CompletableFuture<Collection<String>>> 
requestMetricQueryServicePathsSupplier;
@@ -151,6 +164,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
 
                public Builder() {
                        requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
+                       requestJobStatusFunction = 
DEFAULT_REQUEST_JOB_STATUS_FUNCTION;
                        requestMultipleJobDetailsSupplier = 
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER;
                        requestClusterOverviewSupplier = 
DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER;
                        requestMetricQueryServicePathsSupplier = 
DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER;
@@ -177,6 +191,11 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                        return this;
                }
 
+               public Builder setRequestJobStatusFunction(Function<JobID, 
CompletableFuture<JobStatus>> requestJobStatusFunction) {
+                       this.requestJobStatusFunction = 
requestJobStatusFunction;
+                       return this;
+               }
+
                public Builder 
setRequestMultipleJobDetailsSupplier(Supplier<CompletableFuture<MultipleJobsDetails>>
 requestMultipleJobDetailsSupplier) {
                        this.requestMultipleJobDetailsSupplier = 
requestMultipleJobDetailsSupplier;
                        return this;
@@ -198,7 +217,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                }
 
                public TestingRestfulGateway build() {
-                       return new TestingRestfulGateway(address, hostname, 
restAddress, requestJobFunction, requestMultipleJobDetailsSupplier, 
requestClusterOverviewSupplier, requestMetricQueryServicePathsSupplier, 
requestTaskManagerMetricQueryServicePathsSupplier);
+                       return new TestingRestfulGateway(address, hostname, 
restAddress, requestJobFunction, requestJobStatusFunction, 
requestMultipleJobDetailsSupplier, requestClusterOverviewSupplier, 
requestMetricQueryServicePathsSupplier, 
requestTaskManagerMetricQueryServicePathsSupplier);
                }
        }
 }

Reply via email to