This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f5fbb64dbfc0d872d5574a10cb7ae035f5d5405a Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Feb 5 16:43:41 2021 +0100 [refactor] Extract common interface for a single Rocks state This commit introduces an interface for iterating over a single state in RocksDB state backend. This is a prerequisite for storing heap timers along with other states from RocksDB. --- .../state/iterator/RocksSingleStateIterator.java | 29 ++++++++++------ .../RocksStatesPerKeyGroupMergeIterator.java | 39 ++++++++++------------ .../state/iterator/SingleStateIterator.java | 37 ++++++++++++++++++++ 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java index 3c0aa82..4608acb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java @@ -23,13 +23,11 @@ import org.apache.flink.util.IOUtils; import javax.annotation.Nonnull; -import java.io.Closeable; - /** * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}. */ -class RocksSingleStateIterator implements Closeable { +class RocksSingleStateIterator implements SingleStateIterator { /** * @param iterator underlying {@link RocksIteratorWrapper} @@ -45,19 +43,30 @@ class RocksSingleStateIterator implements Closeable { private byte[] currentKey; private final int kvStateId; - public byte[] getCurrentKey() { - return currentKey; + @Override + public void next() { + iterator.next(); + if (iterator.isValid()) { + currentKey = iterator.key(); + } + } + + @Override + public boolean isValid() { + return iterator.isValid(); } - public void setCurrentKey(byte[] currentKey) { - this.currentKey = currentKey; + @Override + public byte[] key() { + return currentKey; } - @Nonnull - public RocksIteratorWrapper getIterator() { - return iterator; + @Override + public byte[] value() { + return iterator.value(); } + @Override public int getKvStateId() { return kvStateId; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java index 2f970c9..613d181 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java @@ -40,14 +40,14 @@ import java.util.PriorityQueue; public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterator { private final CloseableRegistry closeableRegistry; - private final PriorityQueue<RocksSingleStateIterator> heap; + private final PriorityQueue<SingleStateIterator> heap; private final int keyGroupPrefixByteCount; private boolean newKeyGroup; private boolean newKVState; private boolean valid; - private RocksSingleStateIterator currentSubIterator; + private SingleStateIterator currentSubIterator; - private static final List<Comparator<RocksSingleStateIterator>> COMPARATORS; + private static final List<Comparator<SingleStateIterator>> COMPARATORS; static { int maxBytes = 2; @@ -57,8 +57,7 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato COMPARATORS.add( (o1, o2) -> { int arrayCmpRes = - compareKeyGroupsForByteArrays( - o1.getCurrentKey(), o2.getCurrentKey(), currentBytes); + compareKeyGroupsForByteArrays(o1.key(), o2.key(), currentBytes); return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; @@ -103,18 +102,14 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato newKeyGroup = false; newKVState = false; - final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator(); - rocksIterator.next(); - - byte[] oldKey = currentSubIterator.getCurrentKey(); - if (rocksIterator.isValid()) { - - currentSubIterator.setCurrentKey(rocksIterator.key()); - - if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { + byte[] oldKey = currentSubIterator.key(); + currentSubIterator.next(); + if (currentSubIterator.isValid()) { + if (isDifferentKeyGroup(oldKey, currentSubIterator.key())) { + SingleStateIterator oldIterator = currentSubIterator; heap.offer(currentSubIterator); currentSubIterator = heap.remove(); - newKVState = currentSubIterator.getIterator() != rocksIterator; + newKVState = currentSubIterator != oldIterator; detectNewKeyGroup(oldKey); } } else { @@ -133,13 +128,13 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato } } - private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap( + private PriorityQueue<SingleStateIterator> buildIteratorHeap( List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) throws IOException { - Comparator<RocksSingleStateIterator> iteratorComparator = + Comparator<SingleStateIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1); - PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue = + PriorityQueue<SingleStateIterator> iteratorPriorityQueue = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) { @@ -165,14 +160,14 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato } private void detectNewKeyGroup(byte[] oldKey) { - if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { + if (isDifferentKeyGroup(oldKey, currentSubIterator.key())) { newKeyGroup = true; } } @Override public int keyGroup() { - final byte[] currentKey = currentSubIterator.getCurrentKey(); + final byte[] currentKey = currentSubIterator.key(); int result = 0; // big endian decode for (int i = 0; i < keyGroupPrefixByteCount; ++i) { @@ -184,12 +179,12 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato @Override public byte[] key() { - return currentSubIterator.getCurrentKey(); + return currentSubIterator.key(); } @Override public byte[] value() { - return currentSubIterator.getIterator().value(); + return currentSubIterator.value(); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java new file mode 100644 index 0000000..0238279 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import java.io.Closeable; + +/** An interface for iterating over a single state in a RocksDB state backend. */ +public interface SingleStateIterator extends Closeable { + void next(); + + boolean isValid(); + + byte[] key(); + + byte[] value(); + + int getKvStateId(); + + @Override + void close(); +}