This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae91fd3bb5d1d132885399e7cc259a59459b8c6e
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue Mar 19 15:01:06 2019 +0100

    [FLINK-11980] Improve efficiency of iterating KeySelectionListener on 
notification
    
    KeySelectionListener was introduced for incremental TTL state cleanup as
     a driver of the cleanup process. Listeners are notified whenever the
     current key in the backend is set (i.e. for every event). The current
    implementation of the collection that holds the listener is a HashSet,
    iterated via forEach on each key change. This method comes with the
    overhead of creating temporaray objects, e.g. iterators, on every
    invocation and even if there is no listener registered. We should rather
    use an ArrayList with for-loop iteration in this hot code path to i)
    minimize overhead and ii) minimize costs for the very likely case that
    there is no listener at all.
    
    This closes #8020.
    
    (cherry picked from commit 284e4486e57270c63b4f08d56599b1b4f97006c7)
---
 .../flink/runtime/state/AbstractKeyedStateBackend.java       | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e28aeef..062092e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,9 +36,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,7 +61,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
        private K currentKey;
 
        /** Listeners to changes of keyed context ({@link #currentKey}). */
-       private final Set<KeySelectionListener<K>> keySelectionListeners;
+       private final ArrayList<KeySelectionListener<K>> keySelectionListeners;
 
        /** The key group of the currently active key. */
        private int currentKeyGroup;
@@ -142,7 +141,7 @@ public abstract class AbstractKeyedStateBackend<K> 
implements
                this.executionConfig = executionConfig;
                this.keyGroupCompressionDecorator = 
keyGroupCompressionDecorator;
                this.ttlTimeProvider = 
Preconditions.checkNotNull(ttlTimeProvider);
-               this.keySelectionListeners = new HashSet<>();
+               this.keySelectionListeners = new ArrayList<>(1);
        }
 
        private static StreamCompressionDecorator 
determineStreamCompression(ExecutionConfig executionConfig) {
@@ -183,7 +182,10 @@ public abstract class AbstractKeyedStateBackend<K> 
implements
        }
 
        private void notifyKeySelected(K newKey) {
-               keySelectionListeners.forEach(listener -> 
listener.keySelected(newKey));
+               // we prefer a for-loop over other iteration schemes for 
performance reasons here.
+               for (int i = 0; i < keySelectionListeners.size(); ++i) {
+                       keySelectionListeners.get(i).keySelected(newKey);
+               }
        }
 
        @Override

Reply via email to