XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1828597077


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java:
##########
@@ -84,9 +84,10 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
     }
 
     @Override
-    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
executor) {
-        mainThreadExecutor.assertRunningInMainThread();
-        return delegate.localCleanupAsync(jobId, executor);
+    public CompletableFuture<Void> localCleanupAsync(
+            JobID jobId, Executor cleanupExecutor, ComponentMainThreadExecutor 
mainThreadExecutor) {
+        this.mainThreadExecutor.assertRunningInMainThread();

Review Comment:
   ```suggestion
           Preconditions.checkState(
                   mainThreadExecutor == this.mainThreadExecutor,
                   "A single main thread executor should be used.");
           this.mainThreadExecutor.assertRunningInMainThread();
   ```
   Let's make sure that we use the same main thread everywhere.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -83,9 +84,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
     }
 
     @Override
-    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
+    public CompletableFuture<Void> localCleanupAsync(
+            JobID jobId, Executor ignoredExecutor, ComponentMainThreadExecutor 
mainThreadExecutor) {

Review Comment:
   ```suggestion
               JobID jobId, Executor ignoredCleanupExecutor, 
ComponentMainThreadExecutor mainThreadExecutor) {
   ```
   nit: let's add the purpose to the parameter name here as well for better 
code readability.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResourceWithMainThread.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableResourceWithMainThread} is an extension of the {@link
+ * LocallyCleanableResource} interface. It allows the {@link 
DispatcherResourceCleanerFactory} to
+ * inject the main thread as part of the cleanup procedure.
+ *
+ * <p>See {@code LocallyCleanableResource} for further context on the proper 
contract of the two
+ * interfaces.
+ *
+ * @see org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource

Review Comment:
   ```suggestion
   ```
   nit: I guess, the link can be removed because we're linking to this 
interface in the text already.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -26,27 +26,41 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /** {@code TestingResourceCleanerFactory} for adding custom {@link 
ResourceCleaner} creation. */
 public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
 
     private final Collection<LocallyCleanableResource> 
locallyCleanableResources;
+    private Collection<LocallyCleanableResourceWithMainThread>

Review Comment:
   ```suggestion
       private final Collection<LocallyCleanableResourceWithMainThread>
   ```
   The fields fields of the actual test instance are immutable



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -157,25 +144,33 @@ void testFailingLocalCleanup() {
 
         assertThatFuture(
                         testInstance.localCleanupAsync(
-                                jobManagerRunner.getJobID(), 
Executors.directExecutor()))
+                                jobManagerRunner.getJobID(),
+                                Executors.directExecutor(),
+                                new 
TestComponentMainThreadExecutor(Thread.currentThread())))
                 .isCompletedExceptionally()
                 .eventuallyFailsWith(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, 
FlinkAssertions.STREAM_THROWABLE)
                 .hasExactlyElementsOfTypes(ExecutionException.class, 
expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .as(
+                        "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.")
+                .isTrue();
     }
 
     @Test
-    void testSuccessfulLocalCleanupAsync() {

Review Comment:
   If we overload the method `registerTestingJobManagerRunner`:
   ```java
       private TestingJobManagerRunner registerTestingJobManagerRunner() {
           final TestingJobManagerRunner jobManagerRunner = 
TestingJobManagerRunner.newBuilder().build();
           registerTestingJobManagerRunner(jobManagerRunner);
   
           return jobManagerRunner;
       }
   
       private void registerTestingJobManagerRunner(
               TestingJobManagerRunner jobManagerRunner) {
           testInstance.register(jobManagerRunner);
   
           
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
           assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
       }
   ```
   we can make this test even more reactive:
   
   ```java
       @RegisterExtension
       static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
               TestingUtils.defaultExecutorExtension();
       // [...]
       @Test
       void testSuccessfulLocalCleanupWaitsForDeregistration() {
           final ManuallyTriggeredComponentMainThreadExecutor 
mainThreadExecutor =
                   new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
           final TestingJobManagerRunner jobManagerRunner = 
registerTestingJobManagerRunner();
           final CompletableFuture<Void> cleanupResult =
                   testInstance.localCleanupAsync(
                           jobManagerRunner.getJobID(),
                           Executors.directExecutor(),
                           mainThreadExecutor);
   
           jobManagerRunner.getTerminationFuture().complete(null);
   
           FlinkAssertions.assertThatFuture(cleanupResult)
                   .as("Wait for the de-registration to complete")
                   .isNotDone();
   
           mainThreadExecutor.triggerAll();
   
           FlinkAssertions.assertThatFuture(cleanupResult).eventuallySucceeds();
       }
   
       @Test
       void testSuccessfulLocalCleanupReliesOnJobManagerRunnerClose() {
           final TestingJobManagerRunner jobManagerRunner =
                   
TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build();
           registerTestingJobManagerRunner(jobManagerRunner);
           final CompletableFuture<Void> cleanupResult =
                   CompletableFuture.runAsync(
                           () ->
                                   testInstance.localCleanupAsync(
                                           jobManagerRunner.getJobID(),
                                           Executors.directExecutor(),
                                           // execute de-registration 
right-away w/o any additional
                                           // blocking
                                           
ComponentMainThreadExecutorServiceAdapter.forMainThread()),
                           // TestingJobManagerRunner doesn't close 
asynchronously, so we have to run
                           // the close call in a separate thread to unblock 
the test main thread
                           EXECUTOR_EXTENSION.getExecutor());
   
           FlinkAssertions.assertThatFuture(cleanupResult)
                   .as("Wait for close call to complete")
                   .isNotDone();
   
           jobManagerRunner.getTerminationFuture().complete(null);
   
           FlinkAssertions.assertThatFuture(cleanupResult).eventuallySucceeds();
       }
   ```
   
   I didn't run the code. So it might need some adjustment.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestComponentMainThreadExecutor.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+
+public class TestComponentMainThreadExecutor

Review Comment:
   This is already available with 
[ComponentMainThreadExecutorServiceAdapter#forMainThread()](https://github.com/apache/flink/blob/b15a704b4f70590a6cefcf3bc6fcd412941717f9/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java#L60)



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
     /** {@code Builder} for creating {@code TestingResourceCleanerFactory} 
instances. */
     public static class Builder {
 
-        private Collection<LocallyCleanableResource> locallyCleanableResources 
= new ArrayList<>();
+        private Collection<LocallyCleanableResource> locallyCleanableResource 
= new ArrayList<>();
+        private Collection<LocallyCleanableResourceWithMainThread>
+                locallyCleanableResourceWithMainThreads = new ArrayList<>();

Review Comment:
   ```suggestion
                   locallyCleanableResourcesWithMainThread = new ArrayList<>();
   ```
   nit: moving the `s` to the right place - there is only one main thread but 
possibly multiple resources.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java:
##########
@@ -96,7 +96,7 @@ private JobManagerRunnerRegistry 
createJobManagerRunnerRegistry() {
 
         return TestingJobManagerRunnerRegistry.builder()
                 .withLocalCleanupAsyncFunction(
-                        (jobId, executor) -> {
+                        (jobId, executor, mainThreadExecutor) -> {

Review Comment:
   ```suggestion
                           (jobId, cleanupExecutor, mainThreadExecutor) -> {
   ```
   nit



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java:
##########
@@ -220,7 +220,8 @@ public void testCleanupNotCancellable() throws Exception {
         final JobManagerRunnerRegistry jobManagerRunnerRegistry =
                 
TestingJobManagerRunnerRegistry.newSingleJobBuilder(jobManagerRunnerEntry)
                         .withLocalCleanupAsyncFunction(
-                                (actualJobId, executor) -> 
jobManagerRunnerCleanupFuture)
+                                (actualJobId, executor, mainThreadExector) ->

Review Comment:
   ```suggestion
                                   (actualJobId, cleanupExecutor, 
mainThreadExector) ->
   ```
   nit: just to differentiate the two more explicitly



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
     /** {@code Builder} for creating {@code TestingResourceCleanerFactory} 
instances. */
     public static class Builder {
 
-        private Collection<LocallyCleanableResource> locallyCleanableResources 
= new ArrayList<>();
+        private Collection<LocallyCleanableResource> locallyCleanableResource 
= new ArrayList<>();
+        private Collection<LocallyCleanableResourceWithMainThread>
+                locallyCleanableResourceWithMainThreads = new ArrayList<>();
         private Collection<GloballyCleanableResource> 
globallyCleanableResources =
                 new ArrayList<>();
 
         private Executor cleanupExecutor = Executors.directExecutor();
 
-        public Builder setLocallyCleanableResources(
-                Collection<LocallyCleanableResource> 
locallyCleanableResources) {
-            this.locallyCleanableResources = locallyCleanableResources;
+        public Builder setLocallyCleanableInMainThreadResource(
+                Collection<LocallyCleanableResourceWithMainThread>
+                        locallyCleanableResourceWithMainThread) {
+            this.locallyCleanableResource = locallyCleanableResource;
             return this;
         }
 
         public Builder withLocallyCleanableResource(
                 LocallyCleanableResource locallyCleanableResource) {
-            this.locallyCleanableResources.add(locallyCleanableResource);
+            this.locallyCleanableResource.add(locallyCleanableResource);
+            return this;
+        }
+
+        public Builder withLocallyCleanableInMainThreadResource(

Review Comment:
   ```suggestion
           public Builder withLocallyCleanableResourceInMainThreadResource(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
     /** {@code Builder} for creating {@code TestingResourceCleanerFactory} 
instances. */
     public static class Builder {
 
-        private Collection<LocallyCleanableResource> locallyCleanableResources 
= new ArrayList<>();
+        private Collection<LocallyCleanableResource> locallyCleanableResource 
= new ArrayList<>();

Review Comment:
   ```suggestion
           private Collection<LocallyCleanableResource> 
locallyCleanableResources = new ArrayList<>();
   ```
   What's the purpose of renaming this field? Keeping the `s` also makes the 
diff of this PR smaller slightly smaller.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
     /** {@code Builder} for creating {@code TestingResourceCleanerFactory} 
instances. */
     public static class Builder {
 
-        private Collection<LocallyCleanableResource> locallyCleanableResources 
= new ArrayList<>();
+        private Collection<LocallyCleanableResource> locallyCleanableResource 
= new ArrayList<>();
+        private Collection<LocallyCleanableResourceWithMainThread>
+                locallyCleanableResourceWithMainThreads = new ArrayList<>();
         private Collection<GloballyCleanableResource> 
globallyCleanableResources =
                 new ArrayList<>();
 
         private Executor cleanupExecutor = Executors.directExecutor();
 
-        public Builder setLocallyCleanableResources(
-                Collection<LocallyCleanableResource> 
locallyCleanableResources) {
-            this.locallyCleanableResources = locallyCleanableResources;
+        public Builder setLocallyCleanableInMainThreadResource(

Review Comment:
   ```suggestion
           public Builder setLocallyCleanableResourcesInMainThreadResource(
   ```
   nit: to keep the naming consistent



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java:
##########


Review Comment:
   That change is not necessary if we use 
`ComponentMainThreadExecutorServiceAdapter#forMainThread`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -26,27 +26,41 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /** {@code TestingResourceCleanerFactory} for adding custom {@link 
ResourceCleaner} creation. */
 public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
 
     private final Collection<LocallyCleanableResource> 
locallyCleanableResources;
+    private Collection<LocallyCleanableResourceWithMainThread>
+            locallyCleanableResourceWithMainThreads;

Review Comment:
   ```suggestion
               locallyCleanableResourcesWithMainThreads;
   ```
   nit



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java:
##########
@@ -50,7 +51,8 @@ public class TestingJobManagerRunnerRegistry implements 
JobManagerRunnerRegistry
     private final Supplier<Set<JobID>> getRunningJobIdsSupplier;
     private final Supplier<Collection<JobManagerRunner>> 
getJobManagerRunnersSupplier;
     private final Function<JobID, JobManagerRunner> unregisterFunction;
-    private final BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupAsyncFunction;
+    private final TriFunction<JobID, Executor, Executor, 
CompletableFuture<Void>>

Review Comment:
   ```suggestion
       private final TriFunction<JobID, Executor, ComponentMainThreadExecutor, 
CompletableFuture<Void>>
   ```
   Let's make it match the actual signature to avoid running into problems down 
the road.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -191,7 +206,7 @@ private TestingDispatcher.Builder 
createTestingDispatcherBuilder() {
                                 // JobManagerRunnerRegistry needs to be added 
explicitly
                                 // because cleaning it will trigger the 
closeAsync latch
                                 // provided by TestingJobManagerRunner
-                                
.withLocallyCleanableResource(jobManagerRunnerRegistry)
+                                
.withLocallyCleanableInMainThreadResource(jobManagerRunnerRegistry)

Review Comment:
   Why are all the other changes in this class necessary? Isn't this change 
good enough? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java:
##########
@@ -188,10 +192,10 @@ public static class Builder {
         private Supplier<Collection<JobManagerRunner>> 
getJobManagerRunnersSupplier =
                 Collections::emptyList;
         private Function<JobID, JobManagerRunner> unregisterFunction = 
ignoredJobId -> null;
-        private BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupAsyncFunction =
-                (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();
-        private BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupAsyncFunction =
-                (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();
+        private TriFunction<JobID, Executor, Executor, CompletableFuture<Void>>
+                localCleanupAsyncFunction =
+                        (ignoredJobId, ignoredExecutor, mainThreadExecutor) ->

Review Comment:
   ```suggestion
                           (ignoredJobId, ignoredCleanupExecutor, 
mainThreadExecutor) ->
   ```
   nit



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -26,27 +26,41 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /** {@code TestingResourceCleanerFactory} for adding custom {@link 
ResourceCleaner} creation. */
 public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
 
     private final Collection<LocallyCleanableResource> 
locallyCleanableResources;
+    private Collection<LocallyCleanableResourceWithMainThread>
+            locallyCleanableResourceWithMainThreads;
     private final Collection<GloballyCleanableResource> 
globallyCleanableResources;
 
     private final Executor cleanupExecutor;
 
     private TestingResourceCleanerFactory(
             Collection<LocallyCleanableResource> locallyCleanableResources,
+            Collection<LocallyCleanableResourceWithMainThread>
+                    locallyCleanableResourceWithMainThreads,

Review Comment:
   ```suggestion
                       locallyCleanableResourcesWithMainThreads,
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -157,25 +144,33 @@ void testFailingLocalCleanup() {
 
         assertThatFuture(
                         testInstance.localCleanupAsync(
-                                jobManagerRunner.getJobID(), 
Executors.directExecutor()))
+                                jobManagerRunner.getJobID(),
+                                Executors.directExecutor(),
+                                new 
TestComponentMainThreadExecutor(Thread.currentThread())))
                 .isCompletedExceptionally()
                 .eventuallyFailsWith(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, 
FlinkAssertions.STREAM_THROWABLE)
                 .hasExactlyElementsOfTypes(ExecutionException.class, 
expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .as(
+                        "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.")
+                .isTrue();
     }
 
     @Test
-    void testSuccessfulLocalCleanupAsync() {
+    void testSuccessfulLocalCleanupAsync() throws InterruptedException, 
ExecutionException {

Review Comment:
   ```suggestion
       void testSuccessfulLocalCleanupAsync() {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -83,9 +83,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
     }
 
     @Override
-    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
+    public CompletableFuture<Void> localCleanupAsync(
+            JobID jobId, Executor ignoredExecutor, Executor 
mainThreadExecutor) {
         if (isRegistered(jobId)) {
-            return unregister(jobId).closeAsync();
+            CompletableFuture<Void> resultFuture = 
this.jobManagerRunners.get(jobId).closeAsync();
+
+            return resultFuture.thenApplyAsync(
+                    result -> {
+                        mainThreadExecutor.execute(() -> unregister(jobId));
+                        return result;
+                    });

Review Comment:
   The jobId is coming from the calling context (i.e. `localCleanupAsync`), not 
from the future. The future returns `Void` which is actually just `null` if I'm 
not mistaken. So, it doesn't add any value. You can go ahead and use 
`.thenRunAsync(() -> unregister(jobId), mainThreadExecutor);` here.
   
   You might want to play around a bit with the `CompletableFuture` utility 
methods to get a better understanding of how these future chains behave.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to