XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1828597077
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java:
##########
@@ -84,9 +84,10 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
}
@Override
- public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
executor) {
- mainThreadExecutor.assertRunningInMainThread();
- return delegate.localCleanupAsync(jobId, executor);
+ public CompletableFuture<Void> localCleanupAsync(
+ JobID jobId, Executor cleanupExecutor, ComponentMainThreadExecutor
mainThreadExecutor) {
+ this.mainThreadExecutor.assertRunningInMainThread();
Review Comment:
```suggestion
Preconditions.checkState(
mainThreadExecutor == this.mainThreadExecutor,
"A single main thread executor should be used.");
this.mainThreadExecutor.assertRunningInMainThread();
```
Let's make sure that we use the same main thread everywhere.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -83,9 +84,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
}
@Override
- public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
unusedExecutor) {
+ public CompletableFuture<Void> localCleanupAsync(
+ JobID jobId, Executor ignoredExecutor, ComponentMainThreadExecutor
mainThreadExecutor) {
Review Comment:
```suggestion
JobID jobId, Executor ignoredCleanupExecutor,
ComponentMainThreadExecutor mainThreadExecutor) {
```
nit: let's add the purpose to the parameter name here as well for better
code readability.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResourceWithMainThread.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableResourceWithMainThread} is an extension of the {@link
+ * LocallyCleanableResource} interface. It allows the {@link
DispatcherResourceCleanerFactory} to
+ * inject the main thread as part of the cleanup procedure.
+ *
+ * <p>See {@code LocallyCleanableResource} for further context on the proper
contract of the two
+ * interfaces.
+ *
+ * @see org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource
Review Comment:
```suggestion
```
nit: I guess, the link can be removed because we're linking to this
interface in the text already.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -26,27 +26,41 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
/** {@code TestingResourceCleanerFactory} for adding custom {@link
ResourceCleaner} creation. */
public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
private final Collection<LocallyCleanableResource>
locallyCleanableResources;
+ private Collection<LocallyCleanableResourceWithMainThread>
Review Comment:
```suggestion
private final Collection<LocallyCleanableResourceWithMainThread>
```
The fields fields of the actual test instance are immutable
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -157,25 +144,33 @@ void testFailingLocalCleanup() {
assertThatFuture(
testInstance.localCleanupAsync(
- jobManagerRunner.getJobID(),
Executors.directExecutor()))
+ jobManagerRunner.getJobID(),
+ Executors.directExecutor(),
+ new
TestComponentMainThreadExecutor(Thread.currentThread())))
.isCompletedExceptionally()
.eventuallyFailsWith(ExecutionException.class)
.extracting(FlinkAssertions::chainOfCauses,
FlinkAssertions.STREAM_THROWABLE)
.hasExactlyElementsOfTypes(ExecutionException.class,
expectedException.getClass())
.last()
.isEqualTo(expectedException);
-
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+ .as(
+ "Since the cleanup failed, the JobManagerRunner is
expected to not have been unregistered.")
+ .isTrue();
}
@Test
- void testSuccessfulLocalCleanupAsync() {
Review Comment:
If we overload the method `registerTestingJobManagerRunner`:
```java
private TestingJobManagerRunner registerTestingJobManagerRunner() {
final TestingJobManagerRunner jobManagerRunner =
TestingJobManagerRunner.newBuilder().build();
registerTestingJobManagerRunner(jobManagerRunner);
return jobManagerRunner;
}
private void registerTestingJobManagerRunner(
TestingJobManagerRunner jobManagerRunner) {
testInstance.register(jobManagerRunner);
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
}
```
we can make this test even more reactive:
```java
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();
// [...]
@Test
void testSuccessfulLocalCleanupWaitsForDeregistration() {
final ManuallyTriggeredComponentMainThreadExecutor
mainThreadExecutor =
new
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
final TestingJobManagerRunner jobManagerRunner =
registerTestingJobManagerRunner();
final CompletableFuture<Void> cleanupResult =
testInstance.localCleanupAsync(
jobManagerRunner.getJobID(),
Executors.directExecutor(),
mainThreadExecutor);
jobManagerRunner.getTerminationFuture().complete(null);
FlinkAssertions.assertThatFuture(cleanupResult)
.as("Wait for the de-registration to complete")
.isNotDone();
mainThreadExecutor.triggerAll();
FlinkAssertions.assertThatFuture(cleanupResult).eventuallySucceeds();
}
@Test
void testSuccessfulLocalCleanupReliesOnJobManagerRunnerClose() {
final TestingJobManagerRunner jobManagerRunner =
TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build();
registerTestingJobManagerRunner(jobManagerRunner);
final CompletableFuture<Void> cleanupResult =
CompletableFuture.runAsync(
() ->
testInstance.localCleanupAsync(
jobManagerRunner.getJobID(),
Executors.directExecutor(),
// execute de-registration
right-away w/o any additional
// blocking
ComponentMainThreadExecutorServiceAdapter.forMainThread()),
// TestingJobManagerRunner doesn't close
asynchronously, so we have to run
// the close call in a separate thread to unblock
the test main thread
EXECUTOR_EXTENSION.getExecutor());
FlinkAssertions.assertThatFuture(cleanupResult)
.as("Wait for close call to complete")
.isNotDone();
jobManagerRunner.getTerminationFuture().complete(null);
FlinkAssertions.assertThatFuture(cleanupResult).eventuallySucceeds();
}
```
I didn't run the code. So it might need some adjustment.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestComponentMainThreadExecutor.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+
+public class TestComponentMainThreadExecutor
Review Comment:
This is already available with
[ComponentMainThreadExecutorServiceAdapter#forMainThread()](https://github.com/apache/flink/blob/b15a704b4f70590a6cefcf3bc6fcd412941717f9/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java#L60)
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
/** {@code Builder} for creating {@code TestingResourceCleanerFactory}
instances. */
public static class Builder {
- private Collection<LocallyCleanableResource> locallyCleanableResources
= new ArrayList<>();
+ private Collection<LocallyCleanableResource> locallyCleanableResource
= new ArrayList<>();
+ private Collection<LocallyCleanableResourceWithMainThread>
+ locallyCleanableResourceWithMainThreads = new ArrayList<>();
Review Comment:
```suggestion
locallyCleanableResourcesWithMainThread = new ArrayList<>();
```
nit: moving the `s` to the right place - there is only one main thread but
possibly multiple resources.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java:
##########
@@ -96,7 +96,7 @@ private JobManagerRunnerRegistry
createJobManagerRunnerRegistry() {
return TestingJobManagerRunnerRegistry.builder()
.withLocalCleanupAsyncFunction(
- (jobId, executor) -> {
+ (jobId, executor, mainThreadExecutor) -> {
Review Comment:
```suggestion
(jobId, cleanupExecutor, mainThreadExecutor) -> {
```
nit
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java:
##########
@@ -220,7 +220,8 @@ public void testCleanupNotCancellable() throws Exception {
final JobManagerRunnerRegistry jobManagerRunnerRegistry =
TestingJobManagerRunnerRegistry.newSingleJobBuilder(jobManagerRunnerEntry)
.withLocalCleanupAsyncFunction(
- (actualJobId, executor) ->
jobManagerRunnerCleanupFuture)
+ (actualJobId, executor, mainThreadExector) ->
Review Comment:
```suggestion
(actualJobId, cleanupExecutor,
mainThreadExector) ->
```
nit: just to differentiate the two more explicitly
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
/** {@code Builder} for creating {@code TestingResourceCleanerFactory}
instances. */
public static class Builder {
- private Collection<LocallyCleanableResource> locallyCleanableResources
= new ArrayList<>();
+ private Collection<LocallyCleanableResource> locallyCleanableResource
= new ArrayList<>();
+ private Collection<LocallyCleanableResourceWithMainThread>
+ locallyCleanableResourceWithMainThreads = new ArrayList<>();
private Collection<GloballyCleanableResource>
globallyCleanableResources =
new ArrayList<>();
private Executor cleanupExecutor = Executors.directExecutor();
- public Builder setLocallyCleanableResources(
- Collection<LocallyCleanableResource>
locallyCleanableResources) {
- this.locallyCleanableResources = locallyCleanableResources;
+ public Builder setLocallyCleanableInMainThreadResource(
+ Collection<LocallyCleanableResourceWithMainThread>
+ locallyCleanableResourceWithMainThread) {
+ this.locallyCleanableResource = locallyCleanableResource;
return this;
}
public Builder withLocallyCleanableResource(
LocallyCleanableResource locallyCleanableResource) {
- this.locallyCleanableResources.add(locallyCleanableResource);
+ this.locallyCleanableResource.add(locallyCleanableResource);
+ return this;
+ }
+
+ public Builder withLocallyCleanableInMainThreadResource(
Review Comment:
```suggestion
public Builder withLocallyCleanableResourceInMainThreadResource(
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
/** {@code Builder} for creating {@code TestingResourceCleanerFactory}
instances. */
public static class Builder {
- private Collection<LocallyCleanableResource> locallyCleanableResources
= new ArrayList<>();
+ private Collection<LocallyCleanableResource> locallyCleanableResource
= new ArrayList<>();
Review Comment:
```suggestion
private Collection<LocallyCleanableResource>
locallyCleanableResources = new ArrayList<>();
```
What's the purpose of renaming this field? Keeping the `s` also makes the
diff of this PR smaller slightly smaller.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -89,21 +103,30 @@ public static Builder builder() {
/** {@code Builder} for creating {@code TestingResourceCleanerFactory}
instances. */
public static class Builder {
- private Collection<LocallyCleanableResource> locallyCleanableResources
= new ArrayList<>();
+ private Collection<LocallyCleanableResource> locallyCleanableResource
= new ArrayList<>();
+ private Collection<LocallyCleanableResourceWithMainThread>
+ locallyCleanableResourceWithMainThreads = new ArrayList<>();
private Collection<GloballyCleanableResource>
globallyCleanableResources =
new ArrayList<>();
private Executor cleanupExecutor = Executors.directExecutor();
- public Builder setLocallyCleanableResources(
- Collection<LocallyCleanableResource>
locallyCleanableResources) {
- this.locallyCleanableResources = locallyCleanableResources;
+ public Builder setLocallyCleanableInMainThreadResource(
Review Comment:
```suggestion
public Builder setLocallyCleanableResourcesInMainThreadResource(
```
nit: to keep the naming consistent
##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java:
##########
Review Comment:
That change is not necessary if we use
`ComponentMainThreadExecutorServiceAdapter#forMainThread`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -26,27 +26,41 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
/** {@code TestingResourceCleanerFactory} for adding custom {@link
ResourceCleaner} creation. */
public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
private final Collection<LocallyCleanableResource>
locallyCleanableResources;
+ private Collection<LocallyCleanableResourceWithMainThread>
+ locallyCleanableResourceWithMainThreads;
Review Comment:
```suggestion
locallyCleanableResourcesWithMainThreads;
```
nit
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java:
##########
@@ -50,7 +51,8 @@ public class TestingJobManagerRunnerRegistry implements
JobManagerRunnerRegistry
private final Supplier<Set<JobID>> getRunningJobIdsSupplier;
private final Supplier<Collection<JobManagerRunner>>
getJobManagerRunnersSupplier;
private final Function<JobID, JobManagerRunner> unregisterFunction;
- private final BiFunction<JobID, Executor, CompletableFuture<Void>>
localCleanupAsyncFunction;
+ private final TriFunction<JobID, Executor, Executor,
CompletableFuture<Void>>
Review Comment:
```suggestion
private final TriFunction<JobID, Executor, ComponentMainThreadExecutor,
CompletableFuture<Void>>
```
Let's make it match the actual signature to avoid running into problems down
the road.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -191,7 +206,7 @@ private TestingDispatcher.Builder
createTestingDispatcherBuilder() {
// JobManagerRunnerRegistry needs to be added
explicitly
// because cleaning it will trigger the
closeAsync latch
// provided by TestingJobManagerRunner
-
.withLocallyCleanableResource(jobManagerRunnerRegistry)
+
.withLocallyCleanableInMainThreadResource(jobManagerRunnerRegistry)
Review Comment:
Why are all the other changes in this class necessary? Isn't this change
good enough? 🤔
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java:
##########
@@ -188,10 +192,10 @@ public static class Builder {
private Supplier<Collection<JobManagerRunner>>
getJobManagerRunnersSupplier =
Collections::emptyList;
private Function<JobID, JobManagerRunner> unregisterFunction =
ignoredJobId -> null;
- private BiFunction<JobID, Executor, CompletableFuture<Void>>
localCleanupAsyncFunction =
- (ignoredJobId, ignoredExecutor) ->
FutureUtils.completedVoidFuture();
- private BiFunction<JobID, Executor, CompletableFuture<Void>>
globalCleanupAsyncFunction =
- (ignoredJobId, ignoredExecutor) ->
FutureUtils.completedVoidFuture();
+ private TriFunction<JobID, Executor, Executor, CompletableFuture<Void>>
+ localCleanupAsyncFunction =
+ (ignoredJobId, ignoredExecutor, mainThreadExecutor) ->
Review Comment:
```suggestion
(ignoredJobId, ignoredCleanupExecutor,
mainThreadExecutor) ->
```
nit
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -26,27 +26,41 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
/** {@code TestingResourceCleanerFactory} for adding custom {@link
ResourceCleaner} creation. */
public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
private final Collection<LocallyCleanableResource>
locallyCleanableResources;
+ private Collection<LocallyCleanableResourceWithMainThread>
+ locallyCleanableResourceWithMainThreads;
private final Collection<GloballyCleanableResource>
globallyCleanableResources;
private final Executor cleanupExecutor;
private TestingResourceCleanerFactory(
Collection<LocallyCleanableResource> locallyCleanableResources,
+ Collection<LocallyCleanableResourceWithMainThread>
+ locallyCleanableResourceWithMainThreads,
Review Comment:
```suggestion
locallyCleanableResourcesWithMainThreads,
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -157,25 +144,33 @@ void testFailingLocalCleanup() {
assertThatFuture(
testInstance.localCleanupAsync(
- jobManagerRunner.getJobID(),
Executors.directExecutor()))
+ jobManagerRunner.getJobID(),
+ Executors.directExecutor(),
+ new
TestComponentMainThreadExecutor(Thread.currentThread())))
.isCompletedExceptionally()
.eventuallyFailsWith(ExecutionException.class)
.extracting(FlinkAssertions::chainOfCauses,
FlinkAssertions.STREAM_THROWABLE)
.hasExactlyElementsOfTypes(ExecutionException.class,
expectedException.getClass())
.last()
.isEqualTo(expectedException);
-
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+ .as(
+ "Since the cleanup failed, the JobManagerRunner is
expected to not have been unregistered.")
+ .isTrue();
}
@Test
- void testSuccessfulLocalCleanupAsync() {
+ void testSuccessfulLocalCleanupAsync() throws InterruptedException,
ExecutionException {
Review Comment:
```suggestion
void testSuccessfulLocalCleanupAsync() {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -83,9 +83,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
}
@Override
- public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
unusedExecutor) {
+ public CompletableFuture<Void> localCleanupAsync(
+ JobID jobId, Executor ignoredExecutor, Executor
mainThreadExecutor) {
if (isRegistered(jobId)) {
- return unregister(jobId).closeAsync();
+ CompletableFuture<Void> resultFuture =
this.jobManagerRunners.get(jobId).closeAsync();
+
+ return resultFuture.thenApplyAsync(
+ result -> {
+ mainThreadExecutor.execute(() -> unregister(jobId));
+ return result;
+ });
Review Comment:
The jobId is coming from the calling context (i.e. `localCleanupAsync`), not
from the future. The future returns `Void` which is actually just `null` if I'm
not mistaken. So, it doesn't add any value. You can go ahead and use
`.thenRunAsync(() -> unregister(jobId), mainThreadExecutor);` here.
You might want to play around a bit with the `CompletableFuture` utility
methods to get a better understanding of how these future chains behave.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]