masteryhx commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1616045622


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStIterateOperation.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The iterate operation implementation for ForStDB, which leverages rocksdb's 
iterator directly.
+ */
+public class ForStIterateOperation implements ForStDBOperation {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStIterateOperation.class);
+
+    public static final int CACHE_SIZE_LIMIT = 128;
+
+    private final RocksDB db;
+
+    private final List<ForStDBIterRequest<?>> batchRequest;
+
+    private final Executor executor;
+
+    ForStIterateOperation(RocksDB db, List<ForStDBIterRequest<?>> 
batchRequest, Executor executor) {
+        this.db = db;
+        this.batchRequest = batchRequest;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Void> process() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        AtomicInteger counter = new AtomicInteger(batchRequest.size());
+        for (int i = 0; i < batchRequest.size(); i++) {
+            ForStDBIterRequest<?> request = batchRequest.get(i);
+            executor.execute(
+                    () -> {
+                        // todo: config read options
+                        try (RocksIterator iter = 
db.newIterator(request.getColumnFamilyHandle())) {

Review Comment:
   This logic seems a bit complex.
   Could you add some descriptions in some key steps or split them into methods 
?



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java:
##########
@@ -34,23 +35,33 @@
  */
 public class ForStDBPutRequest<K, V> {
 
-    private final K key;
-    @Nullable private final V value;
-    private final ForStInnerTable<K, V> table;
-    private final InternalStateFuture<Void> future;
+    protected final K key;
 
-    private ForStDBPutRequest(
+    @Nullable protected final V value;
+
+    protected final ForStInnerTable<K, V> table;
+
+    protected final InternalStateFuture<Void> future;
+
+    protected final boolean tableIsMap;
+
+    protected ForStDBPutRequest(
             K key, V value, ForStInnerTable<K, V> table, 
InternalStateFuture<Void> future) {
         this.key = key;
         this.value = value;
         this.table = table;
         this.future = future;
+        this.tableIsMap = table instanceof ForStMapState;
     }
 
     public boolean valueIsNull() {
         return value == null;
     }
 
+    public boolean valueIsMap() {

Review Comment:
   Same as `GetRequest`.
   Maybe we could have a more clear structure.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStIterateOperation.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The iterate operation implementation for ForStDB, which leverages rocksdb's 
iterator directly.
+ */
+public class ForStIterateOperation implements ForStDBOperation {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStIterateOperation.class);
+
+    public static final int CACHE_SIZE_LIMIT = 128;
+
+    private final RocksDB db;
+
+    private final List<ForStDBIterRequest<?>> batchRequest;
+
+    private final Executor executor;
+
+    ForStIterateOperation(RocksDB db, List<ForStDBIterRequest<?>> 
batchRequest, Executor executor) {
+        this.db = db;
+        this.batchRequest = batchRequest;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Void> process() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        AtomicInteger counter = new AtomicInteger(batchRequest.size());
+        for (int i = 0; i < batchRequest.size(); i++) {
+            ForStDBIterRequest<?> request = batchRequest.get(i);
+            executor.execute(
+                    () -> {
+                        // todo: config read options
+                        try (RocksIterator iter = 
db.newIterator(request.getColumnFamilyHandle())) {
+                            byte[] prefix = request.getKeyPrefixBytes();
+                            int userKeyOffset = prefix.length;
+                            if (request.getToSeekBytes() != null) {
+                                iter.seek(request.getToSeekBytes());
+                            } else {
+                                iter.seek(prefix);
+                            }
+                            List<RawEntry> entries = new 
ArrayList<>(CACHE_SIZE_LIMIT);
+                            boolean encounterEnd = false;
+                            byte[] nextSeek = prefix;
+                            while (iter.isValid() && entries.size() < 
CACHE_SIZE_LIMIT) {
+                                byte[] key = iter.key();
+                                if (startWithKeyPrefix(
+                                        prefix, key, 
request.getKeyGroupPrefixBytes())) {
+                                    entries.add(new RawEntry(key, 
iter.value()));
+                                    nextSeek = key;
+                                } else {
+                                    encounterEnd = true;
+                                    break;
+                                }
+                                iter.next();
+                            }
+                            if (iter.isValid()) {
+                                nextSeek = iter.key();
+                            }
+                            if (entries.size() < CACHE_SIZE_LIMIT) {
+                                encounterEnd = true;
+                            }
+                            Collection<Object> deserializedEntries =
+                                    new ArrayList<>(entries.size());
+                            switch (request.getResultType()) {
+                                case KEY:
+                                    {
+                                        for (RawEntry en : entries) {
+                                            deserializedEntries.add(
+                                                    request.deserializeUserKey(
+                                                            en.rawKeyBytes, 
userKeyOffset));
+                                        }
+                                        break;
+                                    }
+                                case VALUE:
+                                    {
+                                        for (RawEntry en : entries) {
+                                            deserializedEntries.add(
+                                                    
request.deserializeUserValue(en.rawValueBytes));
+                                        }
+                                        break;
+                                    }
+                                case ENTRY:
+                                    {
+                                        for (RawEntry en : entries) {
+                                            deserializedEntries.add(
+                                                    new MapEntry(
+                                                            
request.deserializeUserKey(
+                                                                    
en.rawKeyBytes, userKeyOffset),
+                                                            
request.deserializeUserValue(
+                                                                    
en.rawValueBytes)));
+                                        }
+                                        break;
+                                    }
+                                default:
+                                    throw new IllegalStateException(
+                                            "Unknown result type: " + 
request.getResultType());
+                            }
+                            ForStMapIterator stateIterator =
+                                    new ForStMapIterator<>(
+                                            request.getState(),
+                                            request.getResultType(),
+                                            StateRequestType.ITERATOR_LOADING,
+                                            request.getStateRequestHandler(),
+                                            deserializedEntries,
+                                            encounterEnd,
+                                            nextSeek);
+
+                            request.completeStateFuture(stateIterator);
+                        } catch (Exception e) {
+                            LOG.warn("Error when process iterate operation for 
forStDB", e);
+                            future.completeExceptionally(e);

Review Comment:
   Remember to `completeExceptionally` for `InternalStateFuture`.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java:
##########
@@ -34,12 +34,35 @@ public class ForStDBGetRequest<K, V> {
 
     private final K key;
     private final ForStInnerTable<K, V> table;
-    private final InternalStateFuture<V> future;
+    private final InternalStateFuture future;
 
-    private ForStDBGetRequest(K key, ForStInnerTable<K, V> table, 
InternalStateFuture<V> future) {
+    private final boolean toBoolean;

Review Comment:
   Could this have a subclass to make the structure more clear ?



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##########
@@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
     public CompletableFuture<Void> process() {
         return CompletableFuture.runAsync(
                 () -> {
-                    try (WriteBatch writeBatch =
-                            new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+                    try (ForStDBWriteBatchWrapper writeBatch =
+                            new ForStDBWriteBatchWrapper(db, writeOptions, 
batchRequest.size())) {
                         for (ForStDBPutRequest<?, ?> request : batchRequest) {
+                            ColumnFamilyHandle cf = 
request.getColumnFamilyHandle();
                             if (request.valueIsNull()) {
-                                // put(key, null) == delete(key)
-                                writeBatch.delete(
-                                        request.getColumnFamilyHandle(),
-                                        request.buildSerializedKey());
+                                if (request instanceof ForStDBBunchPutRequest) 
{
+                                    ForStDBBunchPutRequest<?> bunchPutRequest =
+                                            (ForStDBBunchPutRequest<?>) 
request;
+                                    byte[] primaryKey = 
bunchPutRequest.buildSerializedKey(null);
+                                    byte[] endKey = 
ForStDBBunchPutRequest.nextBytes(primaryKey);

Review Comment:
   I may missed something.
   What's the logic of calculating and using `deleteRange` ?



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java:
##########
@@ -68,13 +75,58 @@ private void 
convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?> stateRe
                     
dbPutRequests.add(forStValueState.buildDBPutRequest(stateRequest));
                     return;
                 }
+            case MAP_GET:
+            case MAP_CONTAINS:
+                {
+                    ForStMapState<?, ?, ?> forStMapState =
+                            (ForStMapState<?, ?, ?>) stateRequest.getState();
+                    
dbGetRequests.add(forStMapState.buildDBGetRequest(stateRequest));
+                    return;
+                }
+            case MAP_PUT:
+            case MAP_REMOVE:
+                {
+                    ForStMapState<?, ?, ?> forStMapState =
+                            (ForStMapState<?, ?, ?>) stateRequest.getState();
+                    
dbPutRequests.add(forStMapState.buildDBPutRequest(stateRequest));
+                    return;
+                }
+            case MAP_ITER:
+            case MAP_ITER_KEY:
+            case MAP_ITER_VALUE:
+            case ITERATOR_LOADING:
+                {
+                    ForStMapState<?, ?, ?> forStMapState =
+                            (ForStMapState<?, ?, ?>) stateRequest.getState();
+                    
dbIterRequests.add(forStMapState.buildDBIterRequest(stateRequest));
+                    return;
+                }
+            case MAP_PUT_ALL:
+                {
+                    ForStMapState<?, ?, ?> forStMapState =
+                            (ForStMapState<?, ?, ?>) stateRequest.getState();
+                    
dbPutRequests.add(forStMapState.buildDBBunchPutRequest(stateRequest));
+                    return;
+                }
+            case MAP_IS_EMPTY:

Review Comment:
   nit: Same as `MAP_GET` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to