[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207723699 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: The 'timeout' I gave you in the example is also inappropriate. Here timeout is an exception state, and the normal state you are waiting for is not actually it. I know this method's logic, but I still think "until" should be followed with a **state** or or **result** but not a noun. I think `waitUntilExecutionVertexStateChanged` would be better, similar cases in Flink source `ExecutionGraph#waitUntilTerminal`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707425 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit3 = InstantiationUtil.deserializeObject( + serializedInputSplit3.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit3.getSplitNumber()); + +
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707382 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); Review comment: the indentation is confusing, you can consider: ``` SerializedInputSplit serializedInputSplit1 = jobMasterGateway .requestNextInputSplit( source.getID(), ev.getCurrentExecutionAttempt().getAttemptId()) .get(1L, TimeUnit.SECONDS); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707131 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); Review comment: please use unified "" not ''. what's more, use string format looks better to me This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707318 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + + checkNotNull(executionVertex); + checkNotNull(state); + checkArgument(maxWaitMillis >= 0); + + // this is a poor implementation - we may want to improve it eventually + final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000); + + while (true) { + Execution execution = executionVertex.getCurrentExecutionAttempt(); + + if (execution == null || (execution.getState() != state && System.nanoTime() < deadline)) { + try { + Thread.sleep(2); + } catch (InterruptedException ignored) { } + } else { + break; + } + + if (System.nanoTime() >= deadline) { + if (execution != null) { + throw new TimeoutException( + "The execution vertex did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); Review comment: same as above~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707425 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit3 = InstantiationUtil.deserializeObject( + serializedInputSplit3.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit3.getSplitNumber()); + +
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707152 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + Review comment: here I think we could remove the empty line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707414 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit3 = InstantiationUtil.deserializeObject( + serializedInputSplit3.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit3.getSplitNumber()); + +
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707125 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -118,7 +118,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long } if (System.nanoTime() >= deadline) { - throw new TimeoutException("The job did not reach status " + status + " in time. Current status is " + eg.getState() + '.'); + throw new TimeoutException( + "The job did not reach status " + status + " in time. " + + "Current status is " + eg.getState() + '.'); Review comment: please use unified `""` not `''`. what's more, use string format looks better to me This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707390 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); Review comment: the same as above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707284 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: the method name `waitUntilExecutionVertexState` is not very accurate. generally `until` will follow a state, like 'timeout' and so on~ I think we consider a new name, E.g. `waitExecutionVertexStateTransform`, just a example, you can think about it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707407 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); Review comment: the same as above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service,