[ https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339499#comment-16339499 ]
ASF GitHub Bot commented on FLINK-8450: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5309#discussion_r163902431 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -30,62 +30,69 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * Tests for the {@link ExecutionGraphCache}. */ public class ExecutionGraphCacheTest extends TestLogger { + private static ArchivedExecutionGraph expectedExecutionGraph; + private static final JobID expectedJobId = new JobID(); + + @BeforeClass + public static void setup() { + expectedExecutionGraph = new ArchivedExecutionGraphBuilder().build(); + } + /** * Tests that we can cache AccessExecutionGraphs over multiple accesses. */ @Test public void testExecutionGraphCaching() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + final CountingRestfulGateway restfulGateway = createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, accessExecutionGraphFuture.get()); + assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get()); - CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, accessExecutionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, accessExecutionGraphFuture2.get()); --- End diff -- True, will remove it. > Make JobMaster/DispatcherGateway#requestJob type safe > ----------------------------------------------------- > > Key: FLINK-8450 > URL: https://issues.apache.org/jira/browse/FLINK-8450 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{RestfulGateway#requestJob}} returns a > {{CompletableFuture<AccessExecutionGraph>}}. Since {{AccessExecutionGraph}} > is non serializable it could fail if we execute this RPC from a remote > system. In order to make it typesafe we should change its signature to > {{SerializableExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)