[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207723699
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   The 'timeout' I gave you in the example is also inappropriate. Here timeout 
is an exception state, and the normal state you are waiting for is not actually 
it. I know this method's logic, but I still think "until" should be followed 
with a **state** or  or **result** but not a noun. I think 
`waitUntilExecutionVertexStateChanged` would be better, similar cases in Flink 
source `ExecutionGraph#waitUntilTerminal`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707425
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit3 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit3.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit3.getSplitNumber());
+
+   

[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707382
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
 
 Review comment:
   the indentation is confusing, you can consider: 
   
   ```
   SerializedInputSplit serializedInputSplit1 = jobMasterGateway
   .requestNextInputSplit(
source.getID(),
ev.getCurrentExecutionAttempt().getAttemptId())
.get(1L, TimeUnit.SECONDS);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707131
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
 
 Review comment:
   please use unified "" not ''. what's more, use string format looks better to 
me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707318
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+   throws TimeoutException {
+
+   checkNotNull(executionVertex);
+   checkNotNull(state);
+   checkArgument(maxWaitMillis >= 0);
+
+   // this is a poor implementation - we may want to improve it 
eventually
+   final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : 
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+   while (true) {
+   Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+   if (execution == null || (execution.getState() != state 
&& System.nanoTime() < deadline)) {
+   try {
+   Thread.sleep(2);
+   } catch (InterruptedException ignored) { }
+   } else {
+   break;
+   }
+
+   if (System.nanoTime() >= deadline) {
+   if (execution != null) {
+   throw new TimeoutException(
+   "The execution vertex did not 
reach state " + state + " in time. " +
+   "Current status is " + 
execution.getState() + '.');
 
 Review comment:
   same as above~


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707425
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit3 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit3.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit3.getSplitNumber());
+
+   

[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707152
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+   throws TimeoutException {
+
 
 Review comment:
   here I think we could remove the empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707414
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit3 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit3.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit3.getSplitNumber());
+
+   

[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707125
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -118,7 +118,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException("The job did not reach 
status " + status + " in time. Current status is " + eg.getState() + '.');
+   throw new TimeoutException(
+   "The job did not reach status " + status + " in 
time. " +
+   "Current status is " + eg.getState() + '.');
 
 Review comment:
   please use unified `""` not `''`.  what's more, use string format looks 
better to me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707390
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
 
 Review comment:
   the same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707284
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   the method name `waitUntilExecutionVertexState` is not very accurate. 
generally `until` will follow a state, like 'timeout' and so on~ I think we 
consider a new name, E.g. `waitExecutionVertexStateTransform`, just a example, 
you can think about it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707407
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
 
 Review comment:
   the same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service,