[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580784#comment-16580784
 ] 

ASF GitHub Bot commented on FLINK-10056:


asfgit closed pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c47f4fd19ff..01cb2b6b099 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
RestartStrategy getRestartStrategy() {
return restartStrategy;
}
+
+   @VisibleForTesting
+   ExecutionGraph getExecutionGraph() {
+   return executionGraph;
+   }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1d36fa5859a..0d603fc17b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -97,13 +97,12 @@
// 

 
/**
-* Waits until the job has reached a certain state.
+* Waits until the Job has reached a certain state.
 *
 * This method is based on polling and might miss very fast state 
transitions!
 */
public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus 
status, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(eg);
checkNotNull(status);
checkArgument(maxWaitMillis >= 0);
@@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException("The job did not reach 
status " + status + " in time. Current status is " + eg.getState() + '.');
+   throw new TimeoutException(
+   String.format("The job did not reach status %s 
in time. Current status is %s.",
+   status, eg.getState()));
}
}
 
@@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
 */
public static void waitUntilExecutionState(Execution execution, 
ExecutionState state, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(execution);
checkNotNull(state);
checkArgument(maxWaitMillis >= 0);
@@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   String.format("The execution did not reach 
state %s in time. Current state is %s.",
+   state, execution.getState()));
+   }
+   }
+
+   /**
+* Waits until the ExecutionVertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+   throws TimeoutException {
+   checkNotNull(executionVertex);
+   checkNotNull(state);
+   checkArgument(maxWaitMillis >= 0);
+
+   // this is a poor implementation - we may want to improve it 
eventually
+   final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : 
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+   while (true) {
+   Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+   if (execution == null || (execution.getState() != state 
&& System.nanoTime() < deadline)) {
+   try {
+   

[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574243#comment-16574243
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on issue #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#issuecomment-411627844
 
 
   cc @aljoscha @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569674#comment-16569674
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207763974
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+   throws TimeoutException {
+
 
 Review comment:
   Well I modify the others all remove the empty line 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569677#comment-16569677
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on issue #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#issuecomment-410574707
 
 
   cc @twalthr @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569396#comment-16569396
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207727020
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   insist my opinion. use `waitUntilExecutionVertexStateChanged` or 
`waitUntilExecutionVertexStateReached` does not provide extra accuracy.
   
   maybe a better solution is `[ executionVertex waitUntilExecutionStateReach: 
state WithTimeOut: maxWaitMillis ]`, but we cannot. i think 
`waitUntilExecutionVertexState` is enough. longer name does not provide extra 
accuracy. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569395#comment-16569395
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207727020
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   insist my opinion. use `waitUntilExecutionVertexStateChanged` or 
`waitUntilExecutionVertexStateReached` does not provide extra accuracy.
   
   maybe a better solution is `[ executionVertex waitUntilStateReach: state 
WithTimeOut: maxWaitMillis ]`, but we cannot. i think 
`waitUntilExecutionVertexState` is enough. longer name does not provide extra 
accuracy. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569335#comment-16569335
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207723699
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   The 'timeout' I gave you in the example is also inappropriate. Here timeout 
is an exception state, and the normal state you are waiting for is not actually 
it. I know this method's logic, but I still think "until" should be followed 
with a **state** or  or **result** but not a noun. I think 
`waitUntilExecutionVertexStateChanged` would be better, similar cases in Flink 
source `ExecutionGraph#waitUntilTerminal`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569248#comment-16569248
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on issue #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#issuecomment-410464862
 
 
   cc @zentol @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569183#comment-16569183
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on issue #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#issuecomment-410449409
 
 
   @yanghua thanks for review!
   suppressed comments and leave my thoughts.
   please review when you are free, thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569182#comment-16569182
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207708631
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   `waitUntilExecutionVertexState` means (as its comment/doc) we wait until the 
ExecutionVertex has reached a certain state. otherwise we get a 
`TimeoutException`. It is clear to me. So for me, I would not consider to 
rename it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569181#comment-16569181
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207708631
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   `waitUntilExecutionVertexState` means we wait until the ExecutionVertex has 
reached a certain state. otherwise we get a `TimeoutException`. It is clear to 
me. So for me, I would not consider to rename it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569177#comment-16569177
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207708380
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+   throws TimeoutException {
+
 
 Review comment:
   This keeps consistent with similar method above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569171#comment-16569171
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707425
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit3 = 

[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569169#comment-16569169
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707318
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+   throws TimeoutException {
+
+   checkNotNull(executionVertex);
+   checkNotNull(state);
+   checkArgument(maxWaitMillis >= 0);
+
+   // this is a poor implementation - we may want to improve it 
eventually
+   final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : 
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+   while (true) {
+   Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+   if (execution == null || (execution.getState() != state 
&& System.nanoTime() < deadline)) {
+   try {
+   Thread.sleep(2);
+   } catch (InterruptedException ignored) { }
+   } else {
+   break;
+   }
+
+   if (System.nanoTime() >= deadline) {
+   if (execution != null) {
+   throw new TimeoutException(
+   "The execution vertex did not 
reach state " + state + " in time. " +
+   "Current status is " + 
execution.getState() + '.');
 
 Review comment:
   same as above~


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569170#comment-16569170
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707425
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit3 = 

[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569168#comment-16569168
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707407
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
 
 Review comment:
   the same as above


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569166#comment-16569166
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707414
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit2 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit2.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(1, inputSplit2.getSplitNumber());
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   eg.failGlobal(new Exception("Testing exception"));
+
+   
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+   SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit3 = 

[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569161#comment-16569161
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707284
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
 
 Review comment:
   the method name `waitUntilExecutionVertexState` is not very accurate. 
generally `until` will follow a state, like 'timeout' and so on~ I think we 
consider a new name, E.g. `waitExecutionVertexStateTransform`, just a example, 
you can think about it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569162#comment-16569162
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707125
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -118,7 +118,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException("The job did not reach 
status " + status + " in time. Current status is " + eg.getState() + '.');
+   throw new TimeoutException(
+   "The job did not reach status " + status + " in 
time. " +
+   "Current status is " + eg.getState() + '.');
 
 Review comment:
   please use unified `""` not `''`.  what's more, use string format looks 
better to me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569163#comment-16569163
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707152
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
+   }
+   }
+
+   /**
+* Waits until the Execution vertex has reached a certain state.
+*
+* This method is based on polling and might miss very fast state 
transitions!
+*/
+   public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+   throws TimeoutException {
+
 
 Review comment:
   here I think we could remove the empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569167#comment-16569167
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707382
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
 
 Review comment:
   the indentation is confusing, you can consider: 
   
   ```
   SerializedInputSplit serializedInputSplit1 = jobMasterGateway
   .requestNextInputSplit(
source.getID(),
ev.getCurrentExecutionAttempt().getAttemptId())
.get(1L, TimeUnit.SECONDS);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569165#comment-16569165
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707390
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -678,6 +688,134 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   @Test
+   public void testRequestNextInputSplit() throws Exception {
+   final String resourceManagerAddress = "rm";
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
+   resourceManagerId,
+   rmResourceId,
+   resourceManagerAddress,
+   "localhost");
+
+   rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+   // build one node JobGraph
+   InputSplitSource inputSplitSource = new 
TestingInputSplitSource();
+
+   JobVertex source = new JobVertex("vertex1");
+   source.setParallelism(1);
+   source.setInputSplitSource(inputSplitSource);
+   source.setInvokableClass(AbstractInvokable.class);
+
+   final JobGraph jobGraph = new JobGraph(source);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+   
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   final JobManagerSharedServices jobManagerSharedServices =
+   new TestingJobManagerSharedServicesBuilder()
+   
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+   .build();
+
+   final JobMaster jobMaster = createJobMaster(
+   configuration,
+   jobGraph,
+   haServices,
+   jobManagerSharedServices);
+
+   CompletableFuture startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+   try {
+   // wait for the start to complete
+   startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   ExecutionGraph eg = jobMaster.getExecutionGraph();
+   ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+   SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
+   InputSplit inputSplit1 = 
InstantiationUtil.deserializeObject(
+   serializedInputSplit1.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+   assertEquals(0, inputSplit1.getSplitNumber());
+
+   SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway.requestNextInputSplit(
+   source.getID(),
+   ev.getCurrentExecutionAttempt().getAttemptId())
+   .get(1L, TimeUnit.SECONDS);
 
 Review comment:
   the same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569164#comment-16569164
 ] 

ASF GitHub Bot commented on FLINK-10056:


yanghua commented on a change in pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#discussion_r207707131
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 ##
 @@ -144,7 +146,48 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
}
 
if (System.nanoTime() >= deadline) {
-   throw new TimeoutException();
+   throw new TimeoutException(
+   "The execution did not reach state " + state + 
" in time. " +
+   "Current status is " + execution.getState() + 
'.');
 
 Review comment:
   please use unified "" not ''. what's more, use string format looks better to 
me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569151#comment-16569151
 ] 

ASF GitHub Bot commented on FLINK-10056:


TisonKun opened a new pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490
 
 
   ## Brief change log
   
   Add `JobMasterTest#testRequestNextInputSplit` to make sure that 
`JobMaster#requestNextInputSplit` works as expected.
   
   
   ## Verifying this change
   
   well, code itself is the verify.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)