[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569171#comment-16569171
 ] 

ASF GitHub Bot commented on FLINK-10056:
----------------------------------------

yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707425
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##########
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
                }
        }
 
+       @Test
+       public void testRequestNextInputSplit() throws Exception {
+               final String resourceManagerAddress = "rm";
+               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+               final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+                       resourceManagerId,
+                       rmResourceId,
+                       resourceManagerAddress,
+                       "localhost");
+
+               rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+               // build one node JobGraph
+               InputSplitSource<TestingInputSplit> inputSplitSource = new 
TestingInputSplitSource();
+
+               JobVertex source = new JobVertex("vertex1");
+               source.setParallelism(1);
+               source.setInputSplitSource(inputSplitSource);
+               source.setInvokableClass(AbstractInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph(source);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+               
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+               final JobManagerSharedServices jobManagerSharedServices =
+                       new TestingJobManagerSharedServicesBuilder()
+                               
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+                               .build();
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       jobGraph,
+                       haServices,
+                       jobManagerSharedServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       ExecutionGraph eg = jobMaster.getExecutionGraph();
+                       ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+                       SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+                               source.getID(),
+                               ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+                               serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                       assertEquals(0, inputSplit1.getSplitNumber());
+
+                       SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+                               source.getID(),
+                               ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+                               serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                       assertEquals(1, inputSplit2.getSplitNumber());
+
+                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+                       eg.failGlobal(new Exception("Testing exception"));
+
+                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+                       SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+                               source.getID(),
+                               ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit3 = 
InstantiationUtil.deserializeObject(
+                               serializedInputSplit3.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                       assertEquals(0, inputSplit3.getSplitNumber());
+
+                       SerializedInputSplit serializedInputSplit4 = 
jobMasterGateway.requestNextInputSplit(
+                               source.getID(),
+                               ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit4 = 
InstantiationUtil.deserializeObject(
+                               serializedInputSplit4.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                       assertEquals(1, inputSplit4.getSplitNumber());
+
 
 Review comment:
   the empty line may be unnecessary~

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -----------------------------
>
>                 Key: FLINK-10056
>                 URL: https://issues.apache.org/jira/browse/FLINK-10056
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager, Tests
>    Affects Versions: 1.5.0
>            Reporter: 陈梓立
>            Assignee: 陈梓立
>            Priority: Major
>              Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to