[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638389#comment-16638389
 ] 

ASF GitHub Bot commented on FLINK-10386:
----------------------------------------

TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r222721303
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##########
 @@ -181,43 +175,26 @@
         */
        @Test
        public void testEarlyCanceling() throws Exception {
-               Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
-               StreamConfig cfg = new StreamConfig(new Configuration());
+               final StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setOperatorID(new OperatorID(4711L, 42L));
                cfg.setStreamOperator(new SlowlyDeserializingOperator());
                cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration());
+               final TaskManagerActions taskManagerActions = 
mock(TaskManagerActions.class);
+               final Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration(), taskManagerActions);
 
-               TestingExecutionStateListener testingExecutionStateListener = 
new TestingExecutionStateListener();
+               final TaskExecutionState state = new TaskExecutionState(
+                       task.getJobID(), task.getExecutionId(), 
ExecutionState.RUNNING);
 
-               task.registerExecutionListener(testingExecutionStateListener);
                task.startTaskThread();
 
-               Future<ExecutionState> running = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
-
-               // wait until the task thread reached state RUNNING
-               ExecutionState executionState = 
running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-               // make sure the task is really running
-               if (executionState != ExecutionState.RUNNING) {
-                       fail("Task entered state " + task.getExecutionState() + 
" with error "
-                                       + 
ExceptionUtils.stringifyException(task.getFailureCause()));
-               }
+               verify(taskManagerActions, 
timeout(2000L)).updateTaskExecutionState(eq(state));
 
                // send a cancel. because the operator takes a long time to 
deserialize, this should
                // hit the task before the operator is deserialized
                task.cancelExecution();
 
-               Future<ExecutionState> canceling = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
-
-               executionState = canceling.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-
-               // the task should reach state canceled eventually
-               assertTrue(executionState == ExecutionState.CANCELING ||
-                               executionState == ExecutionState.CANCELED);
-
-               task.getExecutingThread().join(deadline.timeLeft().toMillis());
 
 Review comment:
   use `task.getExecutingThread().join();` to wait for the task transit into a 
final state, we can/should see it is in `CANCELLED` state. thus not weakens 
test.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> ----------------------------------------------
>
>                 Key: FLINK-10386
>                 URL: https://issues.apache.org/jira/browse/FLINK-10386
>             Project: Flink
>          Issue Type: Sub-task
>          Components: TaskManager
>    Affects Versions: 1.7.0
>            Reporter: tison
>            Assignee: tison
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to