[PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-13 Thread via GitHub


XComp opened a new pull request, #24309:
URL: https://github.com/apache/flink/pull/24309

   ## What is the purpose of the change
   
   Quoting @zentol from 
[FLINK-34427](https://issues.apache.org/jira/browse/FLINK-34427?focusedCommentId=17816969&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17816969)
 here:
   > The problem is the use of scheduled executors in the 
FineGrainedSlotManager. It periodically tries to schedule actions 
unconditionally into the main thread, and this periodic action is also never 
cancelled.
   > If the rpc endpoint shuts down during the periodic delay the scheduled 
action can fire again before the rpc service (and thus scheduled executor) is 
shut down, running into this error.
   
   > This code is plain broken as tt makes assumptions about the lifecycle of 
the scheduled executor. The loop should be canceled when the FGSM is shut down, 
and as a safety rail any scheduled action should validate that the FGSM is not 
shut down yet before scheduling anything into the main thread.
   
   ## Brief change log
   
   * Makes `ManuallyTriggeredScheduledExecutorService` more robust against 
exceptions
   * Adds state check to scheduled task
   
   ## Verifying this change
   
   * `FineGrainedSlotManagerTest#testCloseWithScheduledTask` was added
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-13 Thread via GitHub


flinkbot commented on PR #24309:
URL: https://github.com/apache/flink/pull/24309#issuecomment-1941800355

   
   ## CI report:
   
   * 755cc0abbf3de6c5d6debc20e8a85e1f7fa220f8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-13 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1488731829


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -594,18 +597,19 @@ private void checkResourceRequirementsWithDelay() {
 if (requirementsCheckDelay.toMillis() <= 0) {
 checkResourceRequirements();
 } else {
-if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
-requirementsCheckFuture = new CompletableFuture<>();
-scheduledExecutor.schedule(
-() ->
-mainThreadExecutor.execute(
-() -> {
-checkResourceRequirements();
-
Preconditions.checkNotNull(requirementsCheckFuture)
-.complete(null);
-}),
-requirementsCheckDelay.toMillis(),
-TimeUnit.MILLISECONDS);
+if (requirementsCheckFuture.isDone()) {
+requirementsCheckFuture =
+scheduledExecutor.schedule(
+() -> {
+if (started) {

Review Comment:
   This read is unsafe since `started` isn't volatile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-13 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1488733497


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   This blocking call breaks a lot of stuff. I don't think we can make this 
change; too many tests rely on throwing something in the executor, and later 
triggering the completion once some condition is fulfilled.
   
   Maybe handle exceptions as fatal errors tho. Maybe that breaks fewer things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-13 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1488733497


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   This blocking call breaks a lot of stuff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1489188467


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   This doesn't work for periodic tasks since the result future in the 
ScheduledTask never gets completed.
   
   I'm not sure if this change is correct in the first place. Every 
non-periodic task already throws exceptions when trigger is called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1489188467


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   This doesn't work for periodic tasks since the result future in the 
ScheduledTask never gets completed.
   
   I'm not sure if this change is correct in the first place. Every 
non-periodic task already throws exceptions when trigger is called, and you 
can't wait for non-periodic tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


XComp commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1489646969


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   True, I missed that we're not retrieving the result in case of periodic 
tasks in 
[ScheduledTask#execute](https://github.com/apache/flink/blob/5405239dec0884dff746129c73955c90f455c465/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java#L76).
   
   I changed the behavior: Instead of collecting the error in the future, we're 
going to throw the error rightaway:
   > ScheduledTask only serves as a container for a Callable. The error 
handling should be done by an Executor (e.g. the main thread). Therefore, 
explicitly handling errors inside #execute is out of scope for ScheduledTask.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -594,18 +597,19 @@ private void checkResourceRequirementsWithDelay() {
 if (requirementsCheckDelay.toMillis() <= 0) {
 checkResourceRequirements();
 } else {
-if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
-requirementsCheckFuture = new CompletableFuture<>();
-scheduledExecutor.schedule(
-() ->
-mainThreadExecutor.execute(
-() -> {
-checkResourceRequirements();
-
Preconditions.checkNotNull(requirementsCheckFuture)
-.complete(null);
-}),
-requirementsCheckDelay.toMillis(),
-TimeUnit.MILLISECONDS);
+if (requirementsCheckFuture.isDone()) {
+requirementsCheckFuture =
+scheduledExecutor.schedule(
+() -> {
+if (started) {

Review Comment:
   yeah, I had to revisit the PR. I looked into the synchronization of 
`started` yesterday and was kind of puzzled why we haven't had to synchronize 
the field till now. But that's due to the fact that all methods that rely on 
the `started` field are actually called from within the ResourceManager's main 
thread. In the end, I forgot to consider this when actually accessing the 
`SlotManager`'s state from another thread :facepalm: 
   
   Anyway, I reiterated over the PR and added some assertions to make it 
clearer that the `FineGrainedSlotManager` should be handled from within the 
`ResourceManager`'s main thread. Please clarify if I came up with the wrong 
conclusion and missed something here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


XComp commented on PR #24309:
URL: https://github.com/apache/flink/pull/24309#issuecomment-1944265004

   I updated the PR but will wait till tomorrow with switching it from draft 
back to reviewable state. I want to wait for CI to pass before it makes sense 
to review the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-15 Thread via GitHub


XComp commented on PR #24309:
URL: https://github.com/apache/flink/pull/24309#issuecomment-1945921834

   I looked into the missing synchronization for the 
`FineGrainedSlotManager#started` field: AFAIU, we're relying on all the methods 
which touch `#started` to run in the same thread (which is the 
`ResourceManager`'s main thread). But as far as I can see, there's nothing 
documenting this assumption. 
   
   I'd like improve the code in this regards to make this relationship clearer 
to code readers as part of FLINK-34427. I see two options:
   * Make the implementation more robust in a sense that it's requiring certain 
methods to be executed in the passed (i.e. RM) main thread.
   * Documenting that the `FineGrainedSlotManager` is not thread-safe and is 
required to run in a single thread to allow sequential state transitioning in 
the JavaDoc.
   
   In the current version of this PR, I went for the first option. Two test 
failures popped in the [first CI 
run](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=57529).
 `ResourceManagerTest.testDisconnectTaskManager` failed due to the main-thread 
change. The ActiveResourceManager test failure seems to be unrelated. I created 
FLINK-34447 for that one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-22 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499093498


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -890,13 +900,32 @@ public long getTaskManagerIdleSince(InstanceID 
instanceId) {
 // 
-
 
 private void checkInit() {
-Preconditions.checkState(started, "The slot manager has not been 
started.");
+assertRunsInMainThreadExecutor();
 Preconditions.checkNotNull(resourceManagerId);
-Preconditions.checkNotNull(mainThreadExecutor);
 Preconditions.checkNotNull(resourceAllocator);
 Preconditions.checkNotNull(resourceEventListener);
 }
 
+@GuardedBy("lock") // or executed in mainThreadExecutor
+private boolean isStarted() {
+return mainThreadExecutor != null;
+}

Review Comment:
   This never returns false after `start()` was called because the 
mainThreadExecutor isn't nulled anywhere.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -243,7 +253,7 @@ private void registerSlotManagerMetrics() {
 /** Suspends the component. This clears the internal state of the slot 
manager. */
 @Override
 public void suspend() {

Review Comment:
   Is this not called from the main thread? If not then there are a bunch of 
other concurrency issues in here.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -108,27 +110,30 @@ public class FineGrainedSlotManager implements 
SlotManager {
 /** ResourceManager's id. */
 @Nullable private ResourceManagerId resourceManagerId;
 
-/** Executor for future callbacks which have to be "synchronized". */
-@Nullable private Executor mainThreadExecutor;
+private final Object lock = new Object();
+/**
+ * Executor for future callbacks which have to be "synchronized". This 
field being {@code null}
+ * indicates that the component isn't started.
+ */
+@GuardedBy("lock")
+@Nullable
+private volatile ComponentMainThreadExecutor mainThreadExecutor;

Review Comment:
   It's odd that this is both guarded by a lock and volatile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-22 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499111780


##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java:
##
@@ -594,6 +601,68 @@ private void 
testNotificationAboutNotEnoughResources(boolean withNotificationGra
 };
 }
 
+@Test
+void testCloseWithRequirementsCheckBeingScheduled() throws Exception {
+testCallingCloseWhileHavingTasksScheduled(
+FineGrainedSlotManager::triggerResourceRequirementsCheck, 2);
+}
+
+@Test
+void testCloseWithNeededResourcesDeclarationBeingScheduled() throws 
Exception {
+testCallingCloseWhileHavingTasksScheduled(
+FineGrainedSlotManager::declareNeededResourcesWithDelay, 2);
+}
+
+@Test
+void testCloseWithClusterReconciliationCheckBeingScheduled() throws 
Exception {
+testCallingCloseWhileHavingTasksScheduled(
+slotManager -> {
+// nothing to do - resource allocation is scheduled during 
startup
+},
+1);
+}
+
+private void testCallingCloseWhileHavingTasksScheduled(
+Consumer triggerScheduledTask, int 
expectedScheduledTasks)
+throws Exception {
+new Context() {
+{
+final ManuallyTriggeredScheduledExecutorService 
testScheduledExecutorService =
+new ManuallyTriggeredScheduledExecutorService();
+final ScheduledExecutorService testMainThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+
+scheduledExecutor =
+new 
ScheduledExecutorServiceAdapter(testScheduledExecutorService);
+mainThreadExecutor =
+new TestingComponentMainThreadExecutor(
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+testMainThreadExecutorService));
+
+runTest(() -> runInMainThread(() -> 
triggerScheduledTask.accept(getSlotManager(;
+
+// waiting for the close call to complete ensures that the 
task was scheduled
+// because close is called on the main thread after scheduling 
the task
+FlinkAssertions.assertThatFuture(closeFuture)
+.as(
+"The test run should have been terminated 
before proceeding with the checks.")
+.eventuallySucceeds();
+
+assertThat(testScheduledExecutorService.getAllScheduledTasks())
+.as("The expected number of tasks should have been 
scheduled.")
+.hasSize(expectedScheduledTasks);
+
+// simulate shutting down the RpcEndpoint
+testMainThreadExecutorService.shutdown();
+
+assertThatNoException()
+.as(
+"There shouldn't occur any error due to the 
shutdown of the MainThreadExecutorService.")
+
.isThrownBy(testScheduledExecutorService::triggerScheduledTasks);

Review Comment:
   I don't think you can use this to verify the fix.
   It's too optimistic; since we force the task to be in the queue when the 
future gets cancelled this succeeds, but in real life it can also happen that 
the executor is already executing something while the shutdown is happening, 
where afaik the cancel doesn't do anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-22 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499111780


##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java:
##
@@ -594,6 +601,68 @@ private void 
testNotificationAboutNotEnoughResources(boolean withNotificationGra
 };
 }
 
+@Test
+void testCloseWithRequirementsCheckBeingScheduled() throws Exception {
+testCallingCloseWhileHavingTasksScheduled(
+FineGrainedSlotManager::triggerResourceRequirementsCheck, 2);
+}
+
+@Test
+void testCloseWithNeededResourcesDeclarationBeingScheduled() throws 
Exception {
+testCallingCloseWhileHavingTasksScheduled(
+FineGrainedSlotManager::declareNeededResourcesWithDelay, 2);
+}
+
+@Test
+void testCloseWithClusterReconciliationCheckBeingScheduled() throws 
Exception {
+testCallingCloseWhileHavingTasksScheduled(
+slotManager -> {
+// nothing to do - resource allocation is scheduled during 
startup
+},
+1);
+}
+
+private void testCallingCloseWhileHavingTasksScheduled(
+Consumer triggerScheduledTask, int 
expectedScheduledTasks)
+throws Exception {
+new Context() {
+{
+final ManuallyTriggeredScheduledExecutorService 
testScheduledExecutorService =
+new ManuallyTriggeredScheduledExecutorService();
+final ScheduledExecutorService testMainThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+
+scheduledExecutor =
+new 
ScheduledExecutorServiceAdapter(testScheduledExecutorService);
+mainThreadExecutor =
+new TestingComponentMainThreadExecutor(
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+testMainThreadExecutorService));
+
+runTest(() -> runInMainThread(() -> 
triggerScheduledTask.accept(getSlotManager(;
+
+// waiting for the close call to complete ensures that the 
task was scheduled
+// because close is called on the main thread after scheduling 
the task
+FlinkAssertions.assertThatFuture(closeFuture)
+.as(
+"The test run should have been terminated 
before proceeding with the checks.")
+.eventuallySucceeds();
+
+assertThat(testScheduledExecutorService.getAllScheduledTasks())
+.as("The expected number of tasks should have been 
scheduled.")
+.hasSize(expectedScheduledTasks);
+
+// simulate shutting down the RpcEndpoint
+testMainThreadExecutorService.shutdown();
+
+assertThatNoException()
+.as(
+"There shouldn't occur any error due to the 
shutdown of the MainThreadExecutorService.")
+
.isThrownBy(testScheduledExecutorService::triggerScheduledTasks);

Review Comment:
   I don't think you can use this to verify the fix.
   It's too _neat_; since we force the task to be in the queue when we close 
things you eliminate concurrency, but in real life it can also happen that the 
executor is already executing something while the shutdown is happening, where 
afaik the cancel doesn't do anything.



##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java:
##
@@ -594,6 +601,68 @@ private void 
testNotificationAboutNotEnoughResources(boolean withNotificationGra
 };
 }
 
+@Test
+void testCloseWithRequirementsCheckBeingScheduled() throws Exception {
+testCallingCloseWhileHavingTasksScheduled(
+FineGrainedSlotManager::triggerResourceRequirementsCheck, 2);
+}
+
+@Test
+void testCloseWithNeededResourcesDeclarationBeingScheduled() throws 
Exception {
+testCallingCloseWhileHavingTasksScheduled(
+FineGrainedSlotManager::declareNeededResourcesWithDelay, 2);
+}
+
+@Test
+void testCloseWithClusterReconciliationCheckBeingScheduled() throws 
Exception {
+testCallingCloseWhileHavingTasksScheduled(
+slotManager -> {
+// nothing to do - resource allocation is scheduled during 
startup
+},
+1);
+}
+
+private void testCallingCloseWhileHavingTasksScheduled(
+Consumer triggerScheduledTask, int 
expectedScheduledTasks)
+throws Exception {
+new Context() {
+{
+final ManuallyTriggeredScheduledExecutorService 
testScheduledExecutorService =
+ 

Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-22 Thread via GitHub


XComp commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499457151


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -243,7 +253,7 @@ private void registerSlotManagerMetrics() {
 /** Suspends the component. This clears the internal state of the slot 
manager. */
 @Override
 public void suspend() {

Review Comment:
   Yes, I missed the assert here.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -108,27 +110,30 @@ public class FineGrainedSlotManager implements 
SlotManager {
 /** ResourceManager's id. */
 @Nullable private ResourceManagerId resourceManagerId;
 
-/** Executor for future callbacks which have to be "synchronized". */
-@Nullable private Executor mainThreadExecutor;
+private final Object lock = new Object();
+/**
+ * Executor for future callbacks which have to be "synchronized". This 
field being {@code null}
+ * indicates that the component isn't started.
+ */
+@GuardedBy("lock")
+@Nullable
+private volatile ComponentMainThreadExecutor mainThreadExecutor;

Review Comment:
   correct, the volatile doesn't make sense here. I have to protected the 
`mainThreadExecutor` with a lock because of the `runInMainThreadIfStarted` 
method checks for the main thread executor and executes the Runnable within the 
lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-22 Thread via GitHub


XComp commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499457482


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -890,13 +900,32 @@ public long getTaskManagerIdleSince(InstanceID 
instanceId) {
 // 
-
 
 private void checkInit() {
-Preconditions.checkState(started, "The slot manager has not been 
started.");
+assertRunsInMainThreadExecutor();
 Preconditions.checkNotNull(resourceManagerId);
-Preconditions.checkNotNull(mainThreadExecutor);
 Preconditions.checkNotNull(resourceAllocator);
 Preconditions.checkNotNull(resourceEventListener);
 }
 
+@GuardedBy("lock") // or executed in mainThreadExecutor
+private boolean isStarted() {
+return mainThreadExecutor != null;
+}

Review Comment:
   you're right. I missed unsetting the field again.
   
   > [...] but I don't think we fixed the original issue yet?
   > We never exit the started state again so all the scheduled executor stuff 
still keeps running.
   
   We still fixed FLINK-34427 (partially) because the ScheduledFutures are 
cancelled. But you're correct, there are still edge cases where the issue of 
FLINK-34427 could have appeared due to missing the unsetting of the field. I 
fixed that and added a test to check that restarting the SlotManager after 
suspending it wouldn't cause any issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org