This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 21e844b6c00 [FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend. 21e844b6c00 is described below commit 21e844b6c00b1796fdfc00136ca26d90e889b149 Author: Lijie Wang <wangdachui9...@gmail.com> AuthorDate: Mon Jan 30 00:00:45 2023 +0800 [FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend. This closes #21779. --- .../runtime/watermarkstatus/HeapPriorityQueue.java | 294 +++++++++++++++++++ .../watermarkstatus/StatusWatermarkValve.java | 3 +- .../watermarkstatus/HeapPriorityQueueTest.java | 316 +++++++++++++++++++++ 3 files changed, 611 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java new file mode 100644 index 00000000000..550012baea5 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermarkstatus; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Arrays; + +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE; + +/** + * This class has similar functions with {@link + * org.apache.flink.runtime.state.heap.HeapPriorityQueue}. It is introduced as the replacement of + * {@link org.apache.flink.runtime.state.heap.HeapPriorityQueue} to be used in {@link + * StatusWatermarkValve}, to avoid affecting the performance of memory state backend. + * + * <p>The reason why the performance of memory state backend will be affected if we reuse the {@link + * org.apache.flink.runtime.state.heap.HeapPriorityQueue}: In some scenarios, the {@link + * org.apache.flink.runtime.state.heap.HeapPriorityQueueElement} will only have one + * implementation(used by memory state backend), which allows the jvm to inline its + * methods(getInternalIndex, setInternalIndex). If we reuse it in {@link StatusWatermarkValve}, it + * will cause it to have multiple implementations. Once there are multiple implementations, its + * methods will be difficult to be inlined by jvm, which will result in poor performance of memory + * state backend. + * + * @param <T> type of the contained elements. + */ +public class HeapPriorityQueue<T extends HeapPriorityQueue.HeapPriorityQueueElement> { + + /** The index of the head element in the array that represents the heap. */ + private static final int QUEUE_HEAD_INDEX = 1; + + /** Comparator for the priority of contained elements. */ + @Nonnull private final PriorityComparator<T> elementPriorityComparator; + + /** The array that represents the heap-organized priority queue. */ + @Nonnull private T[] queue; + + /** The current size of the priority queue. */ + @Nonnegative private int size; + + /** + * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity. + * + * @param elementPriorityComparator comparator for the priority of contained elements. + * @param minimumCapacity the minimum and initial capacity of this priority queue. + */ + @SuppressWarnings("unchecked") + public HeapPriorityQueue( + @Nonnull PriorityComparator<T> elementPriorityComparator, + @Nonnegative int minimumCapacity) { + this.queue = (T[]) new HeapPriorityQueueElement[getHeadElementIndex() + minimumCapacity]; + this.size = 0; + this.elementPriorityComparator = elementPriorityComparator; + } + + public void adjustModifiedElement(@Nonnull T element) { + final int elementIndex = element.getInternalIndex(); + if (element == queue[elementIndex]) { + adjustElementAtIndex(element, elementIndex); + } + } + + @Nullable + public T poll() { + return size() > 0 ? removeInternal(getHeadElementIndex()) : null; + } + + @Nullable + public T peek() { + // References to removed elements are expected to become set to null. + return queue[getHeadElementIndex()]; + } + + public boolean add(@Nonnull T toAdd) { + addInternal(toAdd); + return toAdd.getInternalIndex() == getHeadElementIndex(); + } + + public boolean remove(@Nonnull T toRemove) { + final int elementIndex = toRemove.getInternalIndex(); + removeInternal(elementIndex); + return elementIndex == getHeadElementIndex(); + } + + public boolean isEmpty() { + return size() == 0; + } + + public int size() { + return size; + } + + /** Clears the queue. */ + public void clear() { + final int arrayOffset = getHeadElementIndex(); + Arrays.fill(queue, arrayOffset, arrayOffset + size, null); + size = 0; + } + + private void resizeQueueArray(int desiredSize, int minRequiredSize) { + if (isValidArraySize(desiredSize)) { + queue = Arrays.copyOf(queue, desiredSize); + } else if (isValidArraySize(minRequiredSize)) { + queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE); + } else { + throw new OutOfMemoryError( + "Required minimum heap size " + + minRequiredSize + + " exceeds maximum size of " + + MAX_ARRAY_SIZE + + "."); + } + } + + private void moveElementToIdx(T element, int idx) { + queue[idx] = element; + element.setInternalIndex(idx); + } + + private static boolean isValidArraySize(int size) { + return size >= 0 && size <= MAX_ARRAY_SIZE; + } + + private int getHeadElementIndex() { + return QUEUE_HEAD_INDEX; + } + + private void addInternal(@Nonnull T element) { + final int newSize = increaseSizeByOne(); + moveElementToIdx(element, newSize); + siftUp(newSize); + } + + private T removeInternal(int removeIdx) { + T[] heap = this.queue; + T removedValue = heap[removeIdx]; + + assert removedValue.getInternalIndex() == removeIdx; + + final int oldSize = size; + + if (removeIdx != oldSize) { + T element = heap[oldSize]; + moveElementToIdx(element, removeIdx); + adjustElementAtIndex(element, removeIdx); + } + + heap[oldSize] = null; + + --size; + return removedValue; + } + + private void adjustElementAtIndex(T element, int index) { + siftDown(index); + if (queue[index] == element) { + siftUp(index); + } + } + + private void siftUp(int idx) { + final T[] heap = this.queue; + final T currentElement = heap[idx]; + int parentIdx = idx >>> 1; + + while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) { + moveElementToIdx(heap[parentIdx], idx); + idx = parentIdx; + parentIdx >>>= 1; + } + + moveElementToIdx(currentElement, idx); + } + + private void siftDown(int idx) { + final T[] heap = this.queue; + final int heapSize = this.size; + + final T currentElement = heap[idx]; + int firstChildIdx = idx << 1; + int secondChildIdx = firstChildIdx + 1; + + if (isElementIndexValid(secondChildIdx, heapSize) + && isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) { + firstChildIdx = secondChildIdx; + } + + while (isElementIndexValid(firstChildIdx, heapSize) + && isElementPriorityLessThen(heap[firstChildIdx], currentElement)) { + moveElementToIdx(heap[firstChildIdx], idx); + idx = firstChildIdx; + firstChildIdx = idx << 1; + secondChildIdx = firstChildIdx + 1; + + if (isElementIndexValid(secondChildIdx, heapSize) + && isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) { + firstChildIdx = secondChildIdx; + } + } + + moveElementToIdx(currentElement, idx); + } + + private boolean isElementIndexValid(int elementIndex, int heapSize) { + return elementIndex <= heapSize; + } + + private boolean isElementPriorityLessThen(T a, T b) { + return elementPriorityComparator.comparePriority(a, b) < 0; + } + + private int increaseSizeByOne() { + final int oldArraySize = queue.length; + final int minRequiredNewSize = ++size; + if (minRequiredNewSize >= oldArraySize) { + final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1; + resizeQueueArray(oldArraySize + grow, minRequiredNewSize); + } + // TODO implement shrinking as well? + return minRequiredNewSize; + } + + /** + * This interface works similar to {@link Comparable} and is used to prioritize between two + * objects. The main difference between this interface and {@link Comparable} is it is not + * require to follow the usual contract between that {@link Comparable#compareTo(Object)} and + * {@link Object#equals(Object)}. The contract of this interface is: When two objects are equal, + * they indicate the same priority, but indicating the same priority does not require that both + * objects are equal. + * + * @param <T> type of the compared objects. + */ + interface PriorityComparator<T> { + + /** + * Compares two objects for priority. Returns a negative integer, zero, or a positive + * integer as the first argument has lower, equal to, or higher priority than the second. + * + * @param left left operand in the comparison by priority. + * @param right left operand in the comparison by priority. + * @return a negative integer, zero, or a positive integer as the first argument has lower, + * equal to, or higher priority than the second. + */ + int comparePriority(T left, T right); + } + + /** + * Interface for objects that can be managed by a {@link HeapPriorityQueue}. Such an object can + * only be contained in at most one {@link HeapPriorityQueue} at a time. + */ + interface HeapPriorityQueueElement { + + /** + * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained + * in and managed by any {@link HeapPriorityQueue}. We do not strictly enforce that internal + * indexes must be reset to this value when elements are removed from a {@link + * HeapPriorityQueue}. + */ + int NOT_CONTAINED = Integer.MIN_VALUE; + + /** + * Returns the current index of this object in the internal array of {@link + * HeapPriorityQueue}. + */ + int getInternalIndex(); + + /** + * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be + * called by the owning {@link HeapPriorityQueue}. + * + * @param newIndex the new index in the timer heap. + */ + void setInternalIndex(int newIndex); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java index c98befc311e..96cf0968af2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java @@ -20,10 +20,9 @@ package org.apache.flink.streaming.runtime.watermarkstatus; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.state.heap.HeapPriorityQueue; -import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; +import org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.HeapPriorityQueueElement; import org.apache.flink.util.Preconditions; import static org.apache.flink.util.Preconditions.checkArgument; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java new file mode 100644 index 00000000000..5bee93c66c7 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermarkstatus; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link HeapPriorityQueue}. */ +class HeapPriorityQueueTest { + private static final HeapPriorityQueue.PriorityComparator<TestElement> + TEST_ELEMENT_PRIORITY_COMPARATOR = + (left, right) -> Long.compare(left.getPriority(), right.getPriority()); + + @Test + void testPeekPollOrder() { + final int initialCapacity = 4; + final int testSize = 1000; + final Comparator<Long> comparator = getTestElementPriorityComparator(); + HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(initialCapacity); + HashSet<TestElement> checkSet = new HashSet<>(testSize); + + insertRandomElements(priorityQueue, checkSet, testSize); + + long lastPriorityValue = getHighestPriorityValueForComparator(); + int lastSize = priorityQueue.size(); + assertThat(testSize).isEqualTo(lastSize); + TestElement testElement; + while ((testElement = priorityQueue.peek()) != null) { + assertThat(priorityQueue.isEmpty()).isFalse(); + assertThat(lastSize).isEqualTo(priorityQueue.size()); + assertThat(testElement).isEqualTo(priorityQueue.poll()); + assertThat(checkSet.remove(testElement)).isTrue(); + assertThat(comparator.compare(testElement.getPriority(), lastPriorityValue) >= 0) + .isTrue(); + lastPriorityValue = testElement.getPriority(); + --lastSize; + } + + assertThat(priorityQueue.isEmpty()).isTrue(); + assertThat(priorityQueue.size()).isZero(); + assertThat(checkSet).isEmpty(); + } + + @Test + void testRemoveInsertMixKeepsOrder() { + + HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3); + final Comparator<Long> comparator = getTestElementPriorityComparator(); + final ThreadLocalRandom random = ThreadLocalRandom.current(); + final int testSize = 300; + final int addCounterMax = testSize / 4; + int iterationsTillNextAdds = random.nextInt(addCounterMax); + HashSet<TestElement> checkSet = new HashSet<>(testSize); + + insertRandomElements(priorityQueue, checkSet, testSize); + + // check that the whole set is still in order + while (!checkSet.isEmpty()) { + + final long highestPrioValue = getHighestPriorityValueForComparator(); + + Iterator<TestElement> iterator = checkSet.iterator(); + TestElement element = iterator.next(); + iterator.remove(); + + final boolean removesHead = element.equals(priorityQueue.peek()); + + if (removesHead) { + assertThat(priorityQueue.remove(element)).isTrue(); + } else { + priorityQueue.remove(element); + } + + long currentPriorityWatermark; + + // test some bulk polling from time to time + if (removesHead) { + currentPriorityWatermark = element.getPriority(); + } else { + currentPriorityWatermark = highestPrioValue; + } + + while ((element = priorityQueue.poll()) != null) { + assertThat(comparator.compare(element.getPriority(), currentPriorityWatermark) >= 0) + .isTrue(); + currentPriorityWatermark = element.getPriority(); + if (--iterationsTillNextAdds == 0) { + // some random adds + iterationsTillNextAdds = random.nextInt(addCounterMax); + insertRandomElements( + priorityQueue, new HashSet<>(checkSet), 1 + random.nextInt(3)); + currentPriorityWatermark = priorityQueue.peek().getPriority(); + } + } + + assertThat(priorityQueue.isEmpty()).isTrue(); + + checkSet.forEach(priorityQueue::add); + } + } + + @Test + void testPoll() { + HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3); + final Comparator<Long> comparator = getTestElementPriorityComparator(); + + assertThat(priorityQueue.poll()).isNull(); + + final int testSize = 345; + HashSet<TestElement> checkSet = new HashSet<>(testSize); + insertRandomElements(priorityQueue, checkSet, testSize); + + long lastPriorityValue = getHighestPriorityValueForComparator(); + while (!priorityQueue.isEmpty()) { + TestElement removed = priorityQueue.poll(); + assertThat(removed).isNotNull(); + assertThat(checkSet.remove(removed)).isTrue(); + assertThat(comparator.compare(removed.getPriority(), lastPriorityValue) >= 0).isTrue(); + lastPriorityValue = removed.getPriority(); + } + assertThat(checkSet).isEmpty(); + + assertThat(priorityQueue.poll()).isNull(); + } + + @Test + void testIsEmpty() { + HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1); + + assertThat(priorityQueue.isEmpty()).isTrue(); + + assertThat(priorityQueue.add(new TestElement(4711L, 42L))).isTrue(); + assertThat(priorityQueue.isEmpty()).isFalse(); + + priorityQueue.poll(); + assertThat(priorityQueue.isEmpty()).isTrue(); + } + + @Test + void testAdd() { + HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1); + + final List<TestElement> testElements = + Arrays.asList(new TestElement(4711L, 42L), new TestElement(815L, 23L)); + + testElements.sort( + (l, r) -> getTestElementPriorityComparator().compare(r.priority, l.priority)); + + assertThat(priorityQueue.add(testElements.get(0))).isTrue(); + assertThat(priorityQueue.size()).isEqualTo(1); + assertThat(priorityQueue.add(testElements.get(1))).isTrue(); + assertThat(priorityQueue.size()).isEqualTo(2); + assertThat(priorityQueue.poll()).isEqualTo(testElements.get(1)); + assertThat(priorityQueue.size()).isEqualTo(1); + assertThat(priorityQueue.poll()).isEqualTo(testElements.get(0)); + assertThat(priorityQueue.size()).isZero(); + } + + @Test + void testRemove() { + HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1); + + final long key = 4711L; + final long priorityValue = 42L; + final TestElement testElement = new TestElement(key, priorityValue); + + assertThat(priorityQueue.add(testElement)).isTrue(); + assertThat(priorityQueue.remove(testElement)).isTrue(); + assertThat(priorityQueue.isEmpty()).isTrue(); + } + + @Test + void testClear() { + HeapPriorityQueue<TestElement> priorityQueueSet = newPriorityQueue(1); + + int count = 10; + HashSet<TestElement> checkSet = new HashSet<>(count); + insertRandomElements(priorityQueueSet, checkSet, count); + assertThat(priorityQueueSet.size()).isEqualTo(count); + priorityQueueSet.clear(); + assertThat(priorityQueueSet.size()).isZero(); + } + + private HeapPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) { + return new HeapPriorityQueue<>(TEST_ELEMENT_PRIORITY_COMPARATOR, initialCapacity); + } + + private Comparator<Long> getTestElementPriorityComparator() { + return Long::compareTo; + } + + private long getHighestPriorityValueForComparator() { + return getTestElementPriorityComparator().compare(-1L, 1L) > 0 + ? Long.MAX_VALUE + : Long.MIN_VALUE; + } + + private static void insertRandomElements( + HeapPriorityQueue<TestElement> priorityQueue, Set<TestElement> checkSet, int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + final int numUniqueKeys = Math.max(count / 4, 64); + + long duplicatePriority = Long.MIN_VALUE; + + final boolean checkEndSizes = priorityQueue.isEmpty(); + + for (int i = 0; i < count; ++i) { + TestElement element; + do { + long elementPriority; + if (duplicatePriority == Long.MIN_VALUE) { + elementPriority = localRandom.nextLong(); + } else { + elementPriority = duplicatePriority; + duplicatePriority = Long.MIN_VALUE; + } + element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority); + } while (!checkSet.add(element)); + + if (localRandom.nextInt(10) == 0) { + duplicatePriority = element.getPriority(); + } + + final boolean headChangedIndicated = priorityQueue.add(element); + if (element.equals(priorityQueue.peek())) { + assertThat(headChangedIndicated).isTrue(); + } + } + + if (checkEndSizes) { + assertThat(count).isEqualTo(priorityQueue.size()); + } + } + + /** Payload for usage in the test. */ + private static class TestElement implements HeapPriorityQueue.HeapPriorityQueueElement { + + private final long key; + private final long priority; + private int internalIndex; + + public TestElement(long key, long priority) { + this.key = key; + this.priority = priority; + this.internalIndex = NOT_CONTAINED; + } + + public Long getKey() { + return key; + } + + public long getPriority() { + return priority; + } + + @Override + public int getInternalIndex() { + return internalIndex; + } + + @Override + public void setInternalIndex(int newIndex) { + internalIndex = newIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestElement that = (TestElement) o; + return key == that.key && priority == that.priority; + } + + @Override + public int hashCode() { + return Objects.hash(getKey(), getPriority()); + } + + @Override + public String toString() { + return "TestElement{" + "key=" + key + ", priority=" + priority + '}'; + } + } +}