This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 00a2189d837 IGNITE-26251 Fix event sink leak (#6455)
00a2189d837 is described below

commit 00a2189d8379b8ed8a41cb6868f20dc56b4eee4a
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Wed Aug 20 20:42:48 2025 +0300

    IGNITE-26251 Fix event sink leak (#6455)
---
 .../eventlog/impl/ConfigurationBasedSinkRegistry.java     | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
index 51159716d89..e98a57f097c 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.configuration.notifications.ConfigurationListene
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.notifications.ConfigurationListener;
@@ -69,12 +68,16 @@ class ConfigurationBasedSinkRegistry implements 
SinkRegistry {
     @Override
     public void stop() {
         cfg.sinks().stopListen(listener);
+
+        clearCache();
     }
 
     private void updateCache(@Nullable NamedListView<SinkView> newListValue) {
         Map<String, Sink<?>> newCache = new HashMap<>();
         Map<String, Set<Sink<?>>> newCacheByChannel = new HashMap<>();
 
+        clearCache();
+
         if (newListValue != null) {
             for (SinkView sinkView : newListValue) {
                 Sink<?> sink = sinkFactory.createSink(sinkView);
@@ -83,13 +86,11 @@ class ConfigurationBasedSinkRegistry implements 
SinkRegistry {
             }
         }
 
-        for (Entry<String, Sink<?>> entry : cache.entrySet()) {
-            if (!newCache.containsKey(entry.getKey())) {
-                entry.getValue().stop();
-            }
-        }
-
         cache = newCache;
         cacheByChannel = newCacheByChannel;
     }
+
+    private void clearCache() {
+        cache.values().forEach(Sink::stop);
+    }
 }

Reply via email to