This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 00a2189d837 IGNITE-26251 Fix event sink leak (#6455)
00a2189d837 is described below
commit 00a2189d8379b8ed8a41cb6868f20dc56b4eee4a
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Wed Aug 20 20:42:48 2025 +0300
IGNITE-26251 Fix event sink leak (#6455)
---
.../eventlog/impl/ConfigurationBasedSinkRegistry.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
index 51159716d89..e98a57f097c 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
@@ -22,7 +22,6 @@ import static
org.apache.ignite.configuration.notifications.ConfigurationListene
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
@@ -69,12 +68,16 @@ class ConfigurationBasedSinkRegistry implements
SinkRegistry {
@Override
public void stop() {
cfg.sinks().stopListen(listener);
+
+ clearCache();
}
private void updateCache(@Nullable NamedListView<SinkView> newListValue) {
Map<String, Sink<?>> newCache = new HashMap<>();
Map<String, Set<Sink<?>>> newCacheByChannel = new HashMap<>();
+ clearCache();
+
if (newListValue != null) {
for (SinkView sinkView : newListValue) {
Sink<?> sink = sinkFactory.createSink(sinkView);
@@ -83,13 +86,11 @@ class ConfigurationBasedSinkRegistry implements
SinkRegistry {
}
}
- for (Entry<String, Sink<?>> entry : cache.entrySet()) {
- if (!newCache.containsKey(entry.getKey())) {
- entry.getValue().stop();
- }
- }
-
cache = newCache;
cacheByChannel = newCacheByChannel;
}
+
+ private void clearCache() {
+ cache.values().forEach(Sink::stop);
+ }
}