Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]

2024-05-16 Thread via GitHub


ljz2051 commented on code in PR #24768:
URL: https://github.com/apache/flink/pull/24768#discussion_r1604301920


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java:
##
@@ -147,15 +160,30 @@ public  S createState(@Nonnull 
StateDescriptor stateDes
 @Override
 @Nonnull
 public StateExecutor createStateExecutor() {
-// TODO: Make io parallelism configurable
-return new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+synchronized (lock) {

Review Comment:
   You means that `StateExecutor#shutdown `should be invoked in the `close` 
method, not the `dispose` method ?
   
   Maybe we need introduce aonther `closed` flag in ForStKeyedStateBackend to 
prevent "createStateExecutor after close".
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]

2024-05-16 Thread via GitHub


ljz2051 commented on code in PR #24768:
URL: https://github.com/apache/flink/pull/24768#discussion_r1604284072


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java:
##
@@ -147,15 +160,30 @@ public  S createState(@Nonnull 
StateDescriptor stateDes
 @Override
 @Nonnull
 public StateExecutor createStateExecutor() {
-// TODO: Make io parallelism configurable
-return new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+synchronized (lock) {
+if (disposed) {
+throw new FlinkRuntimeException(
+"Attempt to create StateExecutor after 
ForStKeyedStateBackend is disposed.");
+}
+// TODO: Make io parallelism configurable
+StateExecutor stateExecutor =
+new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+managedStateExecutors.add(stateExecutor);
+return stateExecutor;
+}
 }
 
 /** Should only be called by one thread, and only after all accesses to 
the DB happened. */
 @Override
 public void dispose() {
-if (this.disposed) {
-return;
+synchronized (lock) {
+if (disposed) {
+return;
+}
+disposed = true;
+for (StateExecutor executor : managedStateExecutors) {

Review Comment:
   Based on the current implementation, it is indeed doable. However, I think 
that enclosing both `disposed` and `managedStateExecutors` within the 
protective scope of the `lock` constitutes a more safe practice. Additionally, 
the chance of a lock conflict between `createStateExecutor()` and `dispose()` 
is very minimal, so the impact in this place is virtually imperceptible.
   
   So I prefer to put this `for-loop` of `managedStateExecutors` inside the 
`lock` scope.  
   
   WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]

2024-05-12 Thread via GitHub


masteryhx commented on code in PR #24768:
URL: https://github.com/apache/flink/pull/24768#discussion_r1597840604


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java:
##
@@ -147,15 +160,30 @@ public  S createState(@Nonnull 
StateDescriptor stateDes
 @Override
 @Nonnull
 public StateExecutor createStateExecutor() {
-// TODO: Make io parallelism configurable
-return new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+synchronized (lock) {

Review Comment:
   Also shutdown in the close method ?
   `close` is used to close resources created internally.
   `dispose` is used to dispose resources including native resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]

2024-05-10 Thread via GitHub


fredia commented on code in PR #24768:
URL: https://github.com/apache/flink/pull/24768#discussion_r1597330403


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java:
##
@@ -147,15 +160,30 @@ public  S createState(@Nonnull 
StateDescriptor stateDes
 @Override
 @Nonnull
 public StateExecutor createStateExecutor() {
-// TODO: Make io parallelism configurable
-return new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+synchronized (lock) {
+if (disposed) {
+throw new FlinkRuntimeException(
+"Attempt to create StateExecutor after 
ForStKeyedStateBackend is disposed.");
+}
+// TODO: Make io parallelism configurable
+StateExecutor stateExecutor =
+new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+managedStateExecutors.add(stateExecutor);
+return stateExecutor;
+}
 }
 
 /** Should only be called by one thread, and only after all accesses to 
the DB happened. */
 @Override
 public void dispose() {
-if (this.disposed) {
-return;
+synchronized (lock) {
+if (disposed) {
+return;
+}
+disposed = true;
+for (StateExecutor executor : managedStateExecutors) {

Review Comment:
   Can this `for-loop` be placed outside the lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]

2024-05-09 Thread via GitHub


flinkbot commented on PR #24768:
URL: https://github.com/apache/flink/pull/24768#issuecomment-2103890499

   
   ## CI report:
   
   * d364b97a7113d6ace4102d1d8e2acd672296bfb0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org