Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r190279440 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * <p>Possible future improvements: + * <ul> + * <li>We could also implement shrinking for the heap and the deduplication maps.</li> + * <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc..</li> + * </ul> + * + * @param <K> type of the key of the internal timers managed by this priority queue. + * @param <N> type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap<K, N> implements Queue<TimerHeapInternalTimer<K, N>>, Set<TimerHeapInternalTimer<K, N>> { + + /** + * A safe maximum size for arrays in the JVM. + */ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** + * Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. + */ + private static final Comparator<TimerHeapInternalTimer<?, ?>> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** + * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. + */ + private final HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, N>>[] deduplicationMapsByKeyGroup; + + /** + * The array that represents the heap-organized priority queue. + */ + private TimerHeapInternalTimer<K, N>[] queue; + + /** + * The current size of the priority queue. + */ + private int size; + + /** + * The key-group range of timers that are managed by this queue. + */ + private final KeyGroupRange keyGroupRange; + + /** + * The total number of key-groups of the job. + */ + private final int totalNumberOfKeyGroups; + + + /** + * Creates an empty {@link InternalTimerHeap} with the requested initial capacity. + * + * @param minimumCapacity the minimum and initial capacity of this priority queue. + */ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; + this.keyGroupRange = keyGroupRange; + + final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups(); + final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange; + this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange]; + for (int i = 0; i < keyGroupsInLocalRange; ++i) { + deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize); + } + + this.queue = new TimerHeapInternalTimer[1 + minimumCapacity]; + } + + /** + * @see Set#add(Object) + */ + @Override + public boolean add(@Nonnull TimerHeapInternalTimer<K, N> timer) { + + if (getDedupMapForKeyGroup(timer).putIfAbsent(timer, timer) == null) { + final int newSize = ++this.size; + checkCapacity(newSize); + moveElementToIdx(timer, newSize); + siftUp(newSize); + return true; + } else { + return false; + } + } + + /** + * This behaves like {@link #add(TimerHeapInternalTimer)}. + */ + @Override + public boolean offer(@Nonnull TimerHeapInternalTimer<K, N> k) { + return add(k); + } + + @Nullable + @Override + public TimerHeapInternalTimer<K, N> poll() { + return size() > 0 ? removeElementAtIndex(1) : null; + } + + @Nonnull + @Override + public TimerHeapInternalTimer<K, N> remove() { + TimerHeapInternalTimer<K, N> pollResult = poll(); + if (pollResult != null) { + return pollResult; + } else { + throw new NoSuchElementException("InternalTimerPriorityQueue is empty."); + } + } + + @Nullable + @Override + public TimerHeapInternalTimer<K, N> peek() { + return size() > 0 ? queue[1] : null; + } + + @Nonnull + @Override + public TimerHeapInternalTimer<K, N> element() { + TimerHeapInternalTimer<K, N> peekResult = peek(); + if (peekResult != null) { + return peekResult; + } else { + throw new NoSuchElementException("InternalTimerPriorityQueue is empty."); + } + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean contains(@Nullable Object o) { + return (o instanceof TimerHeapInternalTimer) + && getDedupMapForKeyGroup((TimerHeapInternalTimer<?, ?>) o).containsKey(o); + } + + @Override + public boolean remove(@Nullable Object o) { + if (o instanceof TimerHeapInternalTimer) { + return removeInternal((TimerHeapInternalTimer<?, ?>) o); + } + return false; + } + + @Override + public boolean addAll(@Nullable Collection<? extends TimerHeapInternalTimer<K, N>> timers) { + + if (timers == null) { + return true; + } + + if (timers.size() > queue.length) { + checkCapacity(timers.size()); + } + + for (TimerHeapInternalTimer<K, N> k : timers) { + add(k); + } + + return true; + } + + @Nonnull + @Override + public Object[] toArray() { + return Arrays.copyOfRange(queue, 1, size + 1); + } + + @SuppressWarnings({"unchecked", "SuspiciousSystemArraycopy"}) + @Nonnull + @Override + public <T> T[] toArray(@Nonnull T[] array) { + if (array.length >= size) { + System.arraycopy(queue, 1, array, 0, size); + return array; + } + return (T[]) Arrays.copyOfRange(queue, 1, size + 1, array.getClass()); + } + + @Override + public boolean removeAll(@Nullable Collection<?> toRemove) { + + if (toRemove == null) { + return false; + } + + int oldSize = size(); + for (Object o : toRemove) { + remove(o); + } + return size() == oldSize; --- End diff -- Should this be `size() != oldSize`?
---