[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339499#comment-16339499
 ] 

ASF GitHub Bot commented on FLINK-8450:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5309#discussion_r163902431
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
    @@ -30,62 +30,69 @@
     import org.apache.flink.runtime.executiongraph.ExecutionGraph;
     import org.apache.flink.runtime.jobgraph.JobStatus;
     import org.apache.flink.runtime.jobmanager.JobManager;
    -import org.apache.flink.runtime.jobmaster.JobManagerGateway;
     import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
    +import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
     import org.apache.flink.util.ExecutorUtils;
     import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
     import org.apache.flink.util.TestLogger;
     
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
     import org.junit.Test;
     
     import java.util.ArrayList;
    +import java.util.Arrays;
     import java.util.Collection;
     import java.util.Collections;
     import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentLinkedQueue;
     import java.util.concurrent.ExecutionException;
     import java.util.concurrent.ExecutorService;
     import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.function.Function;
     
     import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
     import static org.junit.Assert.assertTrue;
     import static org.junit.Assert.fail;
    -import static org.mockito.Matchers.any;
    -import static org.mockito.Matchers.eq;
    -import static org.mockito.Mockito.mock;
    -import static org.mockito.Mockito.times;
    -import static org.mockito.Mockito.verify;
    -import static org.mockito.Mockito.when;
     
     /**
      * Tests for the {@link ExecutionGraphCache}.
      */
     public class ExecutionGraphCacheTest extends TestLogger {
     
    +   private static ArchivedExecutionGraph expectedExecutionGraph;
    +   private static final JobID expectedJobId = new JobID();
    +
    +   @BeforeClass
    +   public static void setup() {
    +           expectedExecutionGraph = new 
ArchivedExecutionGraphBuilder().build();
    +   }
    +
        /**
         * Tests that we can cache AccessExecutionGraphs over multiple accesses.
         */
        @Test
        public void testExecutionGraphCaching() throws Exception {
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.hours(1L);
    -           final JobID jobId = new JobID();
    -           final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
     
    -           final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
    -           when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
    +           final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(expectedJobId, 
CompletableFuture.completedFuture(expectedExecutionGraph));
     
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
    -                   CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
    +                   CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
     
    -                   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture.get());
    +                   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture.get());
     
    -                   CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
    +                   CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture2 = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
     
    -                   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture2.get());
    +                   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture2.get());
    --- End diff --
    
    True, will remove it.


> Make JobMaster/DispatcherGateway#requestJob type safe
> -----------------------------------------------------
>
>                 Key: FLINK-8450
>                 URL: https://issues.apache.org/jira/browse/FLINK-8450
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Minor
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture<AccessExecutionGraph>}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to