This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 284e4486e57270c63b4f08d56599b1b4f97006c7
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue Mar 19 15:01:06 2019 +0100

    [FLINK-11980] Improve efficiency of iterating KeySelectionListener on 
notification
    
    KeySelectionListener was introduced for incremental TTL state cleanup as a 
driver of the cleanup process. Listeners are notified whenever the current key 
in the backend is set (i.e. for every event). The current implementation of the 
collection that holds the listener is a HashSet, iterated via forEach on each 
key change. This method comes with the overhead of creating temporaray objects, 
e.g. iterators, on every invocation and even if there is no listener 
registered. We should rather [...]
    
    This closes #8020.
---
 .../flink/runtime/state/AbstractKeyedStateBackend.java       | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e28aeef..062092e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,9 +36,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,7 +61,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
        private K currentKey;
 
        /** Listeners to changes of keyed context ({@link #currentKey}). */
-       private final Set<KeySelectionListener<K>> keySelectionListeners;
+       private final ArrayList<KeySelectionListener<K>> keySelectionListeners;
 
        /** The key group of the currently active key. */
        private int currentKeyGroup;
@@ -142,7 +141,7 @@ public abstract class AbstractKeyedStateBackend<K> 
implements
                this.executionConfig = executionConfig;
                this.keyGroupCompressionDecorator = 
keyGroupCompressionDecorator;
                this.ttlTimeProvider = 
Preconditions.checkNotNull(ttlTimeProvider);
-               this.keySelectionListeners = new HashSet<>();
+               this.keySelectionListeners = new ArrayList<>(1);
        }
 
        private static StreamCompressionDecorator 
determineStreamCompression(ExecutionConfig executionConfig) {
@@ -183,7 +182,10 @@ public abstract class AbstractKeyedStateBackend<K> 
implements
        }
 
        private void notifyKeySelected(K newKey) {
-               keySelectionListeners.forEach(listener -> 
listener.keySelected(newKey));
+               // we prefer a for-loop over other iteration schemes for 
performance reasons here.
+               for (int i = 0; i < keySelectionListeners.size(); ++i) {
+                       keySelectionListeners.get(i).keySelected(newKey);
+               }
        }
 
        @Override

Reply via email to