[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580784#comment-16580784 ] ASF GitHub Bot commented on FLINK-10056: asfgit closed pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index c47f4fd19ff..01cb2b6b099 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void payload) { RestartStrategy getRestartStrategy() { return restartStrategy; } + + @VisibleForTesting + ExecutionGraph getExecutionGraph() { + return executionGraph; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 1d36fa5859a..0d603fc17b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -97,13 +97,12 @@ // /** -* Waits until the job has reached a certain state. +* Waits until the Job has reached a certain state. * * This method is based on polling and might miss very fast state transitions! */ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis) throws TimeoutException { - checkNotNull(eg); checkNotNull(status); checkArgument(maxWaitMillis >= 0); @@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long } if (System.nanoTime() >= deadline) { - throw new TimeoutException("The job did not reach status " + status + " in time. Current status is " + eg.getState() + '.'); + throw new TimeoutException( + String.format("The job did not reach status %s in time. Current status is %s.", + status, eg.getState())); } } @@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long */ public static void waitUntilExecutionState(Execution execution, ExecutionState state, long maxWaitMillis) throws TimeoutException { - checkNotNull(execution); checkNotNull(state); checkArgument(maxWaitMillis >= 0); @@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + String.format("The execution did not reach state %s in time. Current state is %s.", + state, execution.getState())); + } + } + + /** +* Waits until the ExecutionVertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + checkNotNull(executionVertex); + checkNotNull(state); + checkArgument(maxWaitMillis >= 0); + + // this is a poor implementation - we may want to improve it eventually + final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000); + + while (true) { + Execution execution = executionVertex.getCurrentExecutionAttempt(); + + if (execution == null || (execution.getState() != state && System.nanoTime() < deadline)) { + try { +
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574243#comment-16574243 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on issue #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#issuecomment-411627844 cc @aljoscha @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569674#comment-16569674 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207763974 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + Review comment: Well I modify the others all remove the empty line This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569677#comment-16569677 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on issue #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#issuecomment-410574707 cc @twalthr @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569396#comment-16569396 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207727020 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: insist my opinion. use `waitUntilExecutionVertexStateChanged` or `waitUntilExecutionVertexStateReached` does not provide extra accuracy. maybe a better solution is `[ executionVertex waitUntilExecutionStateReach: state WithTimeOut: maxWaitMillis ]`, but we cannot. i think `waitUntilExecutionVertexState` is enough. longer name does not provide extra accuracy. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569395#comment-16569395 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207727020 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: insist my opinion. use `waitUntilExecutionVertexStateChanged` or `waitUntilExecutionVertexStateReached` does not provide extra accuracy. maybe a better solution is `[ executionVertex waitUntilStateReach: state WithTimeOut: maxWaitMillis ]`, but we cannot. i think `waitUntilExecutionVertexState` is enough. longer name does not provide extra accuracy. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569335#comment-16569335 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207723699 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: The 'timeout' I gave you in the example is also inappropriate. Here timeout is an exception state, and the normal state you are waiting for is not actually it. I know this method's logic, but I still think "until" should be followed with a **state** or or **result** but not a noun. I think `waitUntilExecutionVertexStateChanged` would be better, similar cases in Flink source `ExecutionGraph#waitUntilTerminal`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569248#comment-16569248 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on issue #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#issuecomment-410464862 cc @zentol @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569183#comment-16569183 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on issue #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#issuecomment-410449409 @yanghua thanks for review! suppressed comments and leave my thoughts. please review when you are free, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569182#comment-16569182 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207708631 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: `waitUntilExecutionVertexState` means (as its comment/doc) we wait until the ExecutionVertex has reached a certain state. otherwise we get a `TimeoutException`. It is clear to me. So for me, I would not consider to rename it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569181#comment-16569181 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207708631 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: `waitUntilExecutionVertexState` means we wait until the ExecutionVertex has reached a certain state. otherwise we get a `TimeoutException`. It is clear to me. So for me, I would not consider to rename it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569177#comment-16569177 ] ASF GitHub Bot commented on FLINK-10056: TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207708380 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + Review comment: This keeps consistent with similar method above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569171#comment-16569171 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707425 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit3 =
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569169#comment-16569169 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707318 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + + checkNotNull(executionVertex); + checkNotNull(state); + checkArgument(maxWaitMillis >= 0); + + // this is a poor implementation - we may want to improve it eventually + final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000); + + while (true) { + Execution execution = executionVertex.getCurrentExecutionAttempt(); + + if (execution == null || (execution.getState() != state && System.nanoTime() < deadline)) { + try { + Thread.sleep(2); + } catch (InterruptedException ignored) { } + } else { + break; + } + + if (System.nanoTime() >= deadline) { + if (execution != null) { + throw new TimeoutException( + "The execution vertex did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); Review comment: same as above~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569170#comment-16569170 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707425 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit3 =
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569168#comment-16569168 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707407 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); Review comment: the same as above
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569166#comment-16569166 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707414 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil.deserializeObject( + serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit3 =
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569161#comment-16569161 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707284 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) Review comment: the method name `waitUntilExecutionVertexState` is not very accurate. generally `until` will follow a state, like 'timeout' and so on~ I think we consider a new name, E.g. `waitExecutionVertexStateTransform`, just a example, you can think about it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569162#comment-16569162 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707125 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -118,7 +118,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long } if (System.nanoTime() >= deadline) { - throw new TimeoutException("The job did not reach status " + status + " in time. Current status is " + eg.getState() + '.'); + throw new TimeoutException( + "The job did not reach status " + status + " in time. " + + "Current status is " + eg.getState() + '.'); Review comment: please use unified `""` not `''`. what's more, use string format looks better to me This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569163#comment-16569163 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707152 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); + } + } + + /** +* Waits until the Execution vertex has reached a certain state. +* +* This method is based on polling and might miss very fast state transitions! +*/ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + Review comment: here I think we could remove the empty line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569167#comment-16569167 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707382 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); Review comment: the indentation is confusing, you can consider: ``` SerializedInputSplit serializedInputSplit1 = jobMasterGateway .requestNextInputSplit( source.getID(), ev.getCurrentExecutionAttempt().getAttemptId()) .get(1L, TimeUnit.SECONDS); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569165#comment-16569165 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707390 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -678,6 +688,134 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + final String resourceManagerAddress = "rm"; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( + resourceManagerId, + rmResourceId, + resourceManagerAddress, + "localhost"); + + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); + + // build one node JobGraph + InputSplitSource inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil.deserializeObject( + serializedInputSplit1.getInputSplitData(), ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway.requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); Review comment: the same as above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569164#comment-16569164 ] ASF GitHub Bot commented on FLINK-10056: yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#discussion_r207707131 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ## @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + "The execution did not reach state " + state + " in time. " + + "Current status is " + execution.getState() + '.'); Review comment: please use unified "" not ''. what's more, use string format looks better to me This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569151#comment-16569151 ] ASF GitHub Bot commented on FLINK-10056: TisonKun opened a new pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490 ## Brief change log Add `JobMasterTest#testRequestNextInputSplit` to make sure that `JobMaster#requestNextInputSplit` works as expected. ## Verifying this change well, code itself is the verify. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > - > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)