Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]
ljz2051 commented on code in PR #24768: URL: https://github.com/apache/flink/pull/24768#discussion_r1604301920 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java: ## @@ -147,15 +160,30 @@ public S createState(@Nonnull StateDescriptor stateDes @Override @Nonnull public StateExecutor createStateExecutor() { -// TODO: Make io parallelism configurable -return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +synchronized (lock) { Review Comment: You means that `StateExecutor#shutdown `should be invoked in the `close` method, not the `dispose` method ? Maybe we need introduce aonther `closed` flag in ForStKeyedStateBackend to prevent "createStateExecutor after close". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]
ljz2051 commented on code in PR #24768: URL: https://github.com/apache/flink/pull/24768#discussion_r1604284072 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java: ## @@ -147,15 +160,30 @@ public S createState(@Nonnull StateDescriptor stateDes @Override @Nonnull public StateExecutor createStateExecutor() { -// TODO: Make io parallelism configurable -return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +synchronized (lock) { +if (disposed) { +throw new FlinkRuntimeException( +"Attempt to create StateExecutor after ForStKeyedStateBackend is disposed."); +} +// TODO: Make io parallelism configurable +StateExecutor stateExecutor = +new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +managedStateExecutors.add(stateExecutor); +return stateExecutor; +} } /** Should only be called by one thread, and only after all accesses to the DB happened. */ @Override public void dispose() { -if (this.disposed) { -return; +synchronized (lock) { +if (disposed) { +return; +} +disposed = true; +for (StateExecutor executor : managedStateExecutors) { Review Comment: Based on the current implementation, it is indeed doable. However, I think that enclosing both `disposed` and `managedStateExecutors` within the protective scope of the `lock` constitutes a more safe practice. Additionally, the chance of a lock conflict between `createStateExecutor()` and `dispose()` is very minimal, so the impact in this place is virtually imperceptible. So I prefer to put this `for-loop` of `managedStateExecutors` inside the `lock` scope. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]
masteryhx commented on code in PR #24768: URL: https://github.com/apache/flink/pull/24768#discussion_r1597840604 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java: ## @@ -147,15 +160,30 @@ public S createState(@Nonnull StateDescriptor stateDes @Override @Nonnull public StateExecutor createStateExecutor() { -// TODO: Make io parallelism configurable -return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +synchronized (lock) { Review Comment: Also shutdown in the close method ? `close` is used to close resources created internally. `dispose` is used to dispose resources including native resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]
fredia commented on code in PR #24768: URL: https://github.com/apache/flink/pull/24768#discussion_r1597330403 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java: ## @@ -147,15 +160,30 @@ public S createState(@Nonnull StateDescriptor stateDes @Override @Nonnull public StateExecutor createStateExecutor() { -// TODO: Make io parallelism configurable -return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +synchronized (lock) { +if (disposed) { +throw new FlinkRuntimeException( +"Attempt to create StateExecutor after ForStKeyedStateBackend is disposed."); +} +// TODO: Make io parallelism configurable +StateExecutor stateExecutor = +new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +managedStateExecutors.add(stateExecutor); +return stateExecutor; +} } /** Should only be called by one thread, and only after all accesses to the DB happened. */ @Override public void dispose() { -if (this.disposed) { -return; +synchronized (lock) { +if (disposed) { +return; +} +disposed = true; +for (StateExecutor executor : managedStateExecutors) { Review Comment: Can this `for-loop` be placed outside the lock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]
flinkbot commented on PR #24768: URL: https://github.com/apache/flink/pull/24768#issuecomment-2103890499 ## CI report: * d364b97a7113d6ace4102d1d8e2acd672296bfb0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org