This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 284e4486e57270c63b4f08d56599b1b4f97006c7 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Tue Mar 19 15:01:06 2019 +0100 [FLINK-11980] Improve efficiency of iterating KeySelectionListener on notification KeySelectionListener was introduced for incremental TTL state cleanup as a driver of the cleanup process. Listeners are notified whenever the current key in the backend is set (i.e. for every event). The current implementation of the collection that holds the listener is a HashSet, iterated via forEach on each key change. This method comes with the overhead of creating temporaray objects, e.g. iterators, on every invocation and even if there is no listener registered. We should rather [...] This closes #8020. --- .../flink/runtime/state/AbstractKeyedStateBackend.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index e28aeef..062092e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -36,9 +36,8 @@ import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -62,7 +61,7 @@ public abstract class AbstractKeyedStateBackend<K> implements private K currentKey; /** Listeners to changes of keyed context ({@link #currentKey}). */ - private final Set<KeySelectionListener<K>> keySelectionListeners; + private final ArrayList<KeySelectionListener<K>> keySelectionListeners; /** The key group of the currently active key. */ private int currentKeyGroup; @@ -142,7 +141,7 @@ public abstract class AbstractKeyedStateBackend<K> implements this.executionConfig = executionConfig; this.keyGroupCompressionDecorator = keyGroupCompressionDecorator; this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider); - this.keySelectionListeners = new HashSet<>(); + this.keySelectionListeners = new ArrayList<>(1); } private static StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) { @@ -183,7 +182,10 @@ public abstract class AbstractKeyedStateBackend<K> implements } private void notifyKeySelected(K newKey) { - keySelectionListeners.forEach(listener -> listener.keySelected(newKey)); + // we prefer a for-loop over other iteration schemes for performance reasons here. + for (int i = 0; i < keySelectionListeners.size(); ++i) { + keySelectionListeners.get(i).keySelected(newKey); + } } @Override