This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5fbb64dbfc0d872d5574a10cb7ae035f5d5405a
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Feb 5 16:43:41 2021 +0100

    [refactor] Extract common interface for a single Rocks state
    
    This commit introduces an interface for iterating over a single state in
    RocksDB state backend. This is a prerequisite for storing heap timers
    along with other states from RocksDB.
---
 .../state/iterator/RocksSingleStateIterator.java   | 29 ++++++++++------
 .../RocksStatesPerKeyGroupMergeIterator.java       | 39 ++++++++++------------
 .../state/iterator/SingleStateIterator.java        | 37 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 32 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
index 3c0aa82..4608acb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
@@ -23,13 +23,11 @@ import org.apache.flink.util.IOUtils;
 
 import javax.annotation.Nonnull;
 
-import java.io.Closeable;
-
 /**
  * Wraps a RocksDB iterator to cache it's current key and assigns an id for 
the key/value state to
  * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}.
  */
-class RocksSingleStateIterator implements Closeable {
+class RocksSingleStateIterator implements SingleStateIterator {
 
     /**
      * @param iterator underlying {@link RocksIteratorWrapper}
@@ -45,19 +43,30 @@ class RocksSingleStateIterator implements Closeable {
     private byte[] currentKey;
     private final int kvStateId;
 
-    public byte[] getCurrentKey() {
-        return currentKey;
+    @Override
+    public void next() {
+        iterator.next();
+        if (iterator.isValid()) {
+            currentKey = iterator.key();
+        }
+    }
+
+    @Override
+    public boolean isValid() {
+        return iterator.isValid();
     }
 
-    public void setCurrentKey(byte[] currentKey) {
-        this.currentKey = currentKey;
+    @Override
+    public byte[] key() {
+        return currentKey;
     }
 
-    @Nonnull
-    public RocksIteratorWrapper getIterator() {
-        return iterator;
+    @Override
+    public byte[] value() {
+        return iterator.value();
     }
 
+    @Override
     public int getKvStateId() {
         return kvStateId;
     }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
index 2f970c9..613d181 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -40,14 +40,14 @@ import java.util.PriorityQueue;
 public class RocksStatesPerKeyGroupMergeIterator implements 
KeyValueStateIterator {
 
     private final CloseableRegistry closeableRegistry;
-    private final PriorityQueue<RocksSingleStateIterator> heap;
+    private final PriorityQueue<SingleStateIterator> heap;
     private final int keyGroupPrefixByteCount;
     private boolean newKeyGroup;
     private boolean newKVState;
     private boolean valid;
-    private RocksSingleStateIterator currentSubIterator;
+    private SingleStateIterator currentSubIterator;
 
-    private static final List<Comparator<RocksSingleStateIterator>> 
COMPARATORS;
+    private static final List<Comparator<SingleStateIterator>> COMPARATORS;
 
     static {
         int maxBytes = 2;
@@ -57,8 +57,7 @@ public class RocksStatesPerKeyGroupMergeIterator implements 
KeyValueStateIterato
             COMPARATORS.add(
                     (o1, o2) -> {
                         int arrayCmpRes =
-                                compareKeyGroupsForByteArrays(
-                                        o1.getCurrentKey(), 
o2.getCurrentKey(), currentBytes);
+                                compareKeyGroupsForByteArrays(o1.key(), 
o2.key(), currentBytes);
                         return arrayCmpRes == 0
                                 ? o1.getKvStateId() - o2.getKvStateId()
                                 : arrayCmpRes;
@@ -103,18 +102,14 @@ public class RocksStatesPerKeyGroupMergeIterator 
implements KeyValueStateIterato
         newKeyGroup = false;
         newKVState = false;
 
-        final RocksIteratorWrapper rocksIterator = 
currentSubIterator.getIterator();
-        rocksIterator.next();
-
-        byte[] oldKey = currentSubIterator.getCurrentKey();
-        if (rocksIterator.isValid()) {
-
-            currentSubIterator.setCurrentKey(rocksIterator.key());
-
-            if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
+        byte[] oldKey = currentSubIterator.key();
+        currentSubIterator.next();
+        if (currentSubIterator.isValid()) {
+            if (isDifferentKeyGroup(oldKey, currentSubIterator.key())) {
+                SingleStateIterator oldIterator = currentSubIterator;
                 heap.offer(currentSubIterator);
                 currentSubIterator = heap.remove();
-                newKVState = currentSubIterator.getIterator() != rocksIterator;
+                newKVState = currentSubIterator != oldIterator;
                 detectNewKeyGroup(oldKey);
             }
         } else {
@@ -133,13 +128,13 @@ public class RocksStatesPerKeyGroupMergeIterator 
implements KeyValueStateIterato
         }
     }
 
-    private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap(
+    private PriorityQueue<SingleStateIterator> buildIteratorHeap(
             List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) 
throws IOException {
 
-        Comparator<RocksSingleStateIterator> iteratorComparator =
+        Comparator<SingleStateIterator> iteratorComparator =
                 COMPARATORS.get(keyGroupPrefixByteCount - 1);
 
-        PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue =
+        PriorityQueue<SingleStateIterator> iteratorPriorityQueue =
                 new PriorityQueue<>(kvStateIterators.size(), 
iteratorComparator);
 
         for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId 
: kvStateIterators) {
@@ -165,14 +160,14 @@ public class RocksStatesPerKeyGroupMergeIterator 
implements KeyValueStateIterato
     }
 
     private void detectNewKeyGroup(byte[] oldKey) {
-        if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+        if (isDifferentKeyGroup(oldKey, currentSubIterator.key())) {
             newKeyGroup = true;
         }
     }
 
     @Override
     public int keyGroup() {
-        final byte[] currentKey = currentSubIterator.getCurrentKey();
+        final byte[] currentKey = currentSubIterator.key();
         int result = 0;
         // big endian decode
         for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
@@ -184,12 +179,12 @@ public class RocksStatesPerKeyGroupMergeIterator 
implements KeyValueStateIterato
 
     @Override
     public byte[] key() {
-        return currentSubIterator.getCurrentKey();
+        return currentSubIterator.key();
     }
 
     @Override
     public byte[] value() {
-        return currentSubIterator.getIterator().value();
+        return currentSubIterator.value();
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java
new file mode 100644
index 0000000..0238279
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import java.io.Closeable;
+
+/** An interface for iterating over a single state in a RocksDB state backend. 
*/
+public interface SingleStateIterator extends Closeable {
+    void next();
+
+    boolean isValid();
+
+    byte[] key();
+
+    byte[] value();
+
+    int getKvStateId();
+
+    @Override
+    void close();
+}

Reply via email to