akalash commented on a change in pull request #16885:
URL: https://github.com/apache/flink/pull/16885#discussion_r692114692



##########
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
##########
@@ -116,7 +116,7 @@ protected void processInput(MailboxDefaultAction.Controller 
controller) throws E
     protected void cancelTask() {}
 
     @Override
-    protected void cleanup() throws Exception {
+    protected void cleanUpInternal() throws Exception {

Review comment:
       There are too many methods with cleanUp in the name, maybe we can rename 
it to something different like closeResources(analog of closeAllOperators) or 
something like that?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -762,17 +762,21 @@ private void doRun() {
             
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
 
             AbstractInvokable finalInvokable = invokable;
-            runWithSystemExitMonitoring(finalInvokable::restore);
+            try {
+                runWithSystemExitMonitoring(finalInvokable::restore);
 
-            if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
-                throw new CancelTaskException();
-            }
+                if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
+                    throw new CancelTaskException();
+                }
 
-            // notify everyone that we switched to running
-            taskManagerActions.updateTaskExecutionState(
-                    new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
+                // notify everyone that we switched to running
+                taskManagerActions.updateTaskExecutionState(
+                        new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
 
-            runWithSystemExitMonitoring(finalInvokable::invoke);
+                runWithSystemExitMonitoring(finalInvokable::invoke);
+            } finally {
+                runWithSystemExitMonitoring(finalInvokable::cleanUp);
+            }

Review comment:
       I just want to share my thoughts about these methods a little. More 
precisely, we have now three tightly connected methods(restore, invoke, 
cleanUp) but we don't have a good contract for them(just some explanation in 
java-doc). I noticed this problem last time when I implemented 'restore'. My 
proposal was to have the extra interface with only these two(now three) methods:
   ```
   InvokableExutor executor = invokable.executor();
   try{
     executor.restore();
     executor.invoke();
   } finally {
     executor.cleanUp();
   }
   ```
   It is still not the best contract but at least it segregates these dependent 
methods from the other methods in `AbstractInvokable`. Unfortunately, it is not 
so easy to implement this idea for the current implementation, so we decided 
that it didn't make sense. But since now we have the third dependent method, I 
want to get back to the discussion about this problem. I still think that my 
idea described above doesn't make sense for the current PR(it is too invasive 
and it doesn't have clear profit). But anyway I want to ask your opinion about 
that. Do you think it is the real problem(having three dependent methods along 
with other random methods)? If so, do you have any idea how we can improve it?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -726,8 +720,6 @@ private void ensureNotCanceled() {
     @Override
     public final void invoke() throws Exception {
         runWithCleanUpOnFail(this::executeInvoke);

Review comment:
       I think the name `runWithCleanUpOnFail` is not correct anymore(we don't 
do any cleanUp there) so maybe we also rename it to something like 
`runWithCancelOnFail`(there is no big difference but I think it will be less 
confusing).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -793,8 +785,6 @@ private void runWithCleanUpOnFail(RunnableWithException 
run) throws Exception {
                         invokeException = firstOrSuppressed(ex, 
invokeException);
                     }
                 }
-
-                cleanUpInvoke();
             }
             // TODO: investigate why Throwable instead of Exception is used 
here.
             catch (Throwable cleanUpException) {

Review comment:
       Please, take a look at this method more carefully I believe it can be a 
little rewritten now. At least this `catch` block doesn't make sense since you 
removed `cleanUpInvoke` so the new exception is impossible after the previous 
`catch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to