This is an automated email from the ASF dual-hosted git repository.
1996fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 80be7e48b82 [hotfix][state/tests] Dispose keyed state backend in
testMapStateWithNullValue
80be7e48b82 is described below
commit 80be7e48b82ec0711e9c3988d9ebea92374c2a4e
Author: Rui Fan <[email protected]>
AuthorDate: Fri May 15 13:31:23 2026 +0200
[hotfix][state/tests] Dispose keyed state backend in
testMapStateWithNullValue
The test creates a CheckpointableKeyedStateBackend via createKeyedBackend
but never closes/disposes it, leaking native resources held by the
backend. For most state backends in the existing suite this is harmless,
but cloud-native backends that own long-lived background tasks (and
poll remote state) keep file handles open under the JUnit @TempDir and
fail class teardown.
Wrap the test body in try/finally with IOUtils.closeQuietly(keyedBackend)
and keyedBackend.dispose(), matching the cleanup pattern used by every
other test in this file.
---
.../flink/runtime/state/StateBackendTestBase.java | 36 +++++++++++++---------
1 file changed, 22 insertions(+), 14 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 92468cb6880..98e467961fd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1166,6 +1166,7 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> {
} finally {
// ensure that native resources are also released in case of
exception
if (backend != null) {
+ IOUtils.closeQuietly(backend);
backend.dispose();
}
}
@@ -1297,6 +1298,7 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> {
snapshot.discardState();
} finally {
+ IOUtils.closeQuietly(backend);
backend.dispose();
}
}
@@ -1399,6 +1401,7 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> {
snapshot.discardState();
} finally {
+ IOUtils.closeQuietly(backend);
backend.dispose();
}
}
@@ -5443,20 +5446,25 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> {
public void testMapStateWithNullValue() throws Exception {
CheckpointableKeyedStateBackend<String> keyedBackend =
createKeyedBackend(new NullUnsafeTypeSerializer());
- MapStateDescriptor<Integer, String> kvId =
- new MapStateDescriptor<>(
- "id", IntSerializer.INSTANCE, new
NullUnsafeTypeSerializer());
- MapState<Integer, String> state =
- keyedBackend.getPartitionedState(
- VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
-
- String key = "key";
- int userKey = 1;
- keyedBackend.setCurrentKey(key);
- // this should not throw an exception
- state.put(userKey, null);
- assertThat(state.contains(userKey)).isTrue();
- assertThat(state.get(userKey)).isNull();
+ try {
+ MapStateDescriptor<Integer, String> kvId =
+ new MapStateDescriptor<>(
+ "id", IntSerializer.INSTANCE, new
NullUnsafeTypeSerializer());
+ MapState<Integer, String> state =
+ keyedBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ String key = "key";
+ int userKey = 1;
+ keyedBackend.setCurrentKey(key);
+ // this should not throw an exception
+ state.put(userKey, null);
+ assertThat(state.contains(userKey)).isTrue();
+ assertThat(state.get(userKey)).isNull();
+ } finally {
+ IOUtils.closeQuietly(keyedBackend);
+ keyedBackend.dispose();
+ }
}
/**