Zakelly commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1571721125


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class ForStValueState<K, V> extends InternalValueState<K, V>
+        implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> {
+
+    /** The column family which this internal value state belongs to. */
+    private final ColumnFamilyHandle columnFamilyHandle;
+
+    /** The serialized key builder which need be thread-safe. */
+    private final ThreadLocal<SerializedCompositeKeyBuilder<K>> 
serializedKeyBuilder;
+
+    /** The data outputStream used for value serializer, which need be 
thread-safe. */
+    private final ThreadLocal<DataOutputSerializer> valueSerializerView;
+
+    /** The data inputStream used for value deserializer, which need be 
thread-safe. */
+    private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
+
+    public ForStValueState(
+            StateRequestHandler stateRequestHandler,
+            ColumnFamilyHandle columnFamily,
+            ValueStateDescriptor<V> valueStateDescriptor,
+            ThreadLocal<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder,
+            ThreadLocal<DataOutputSerializer> valueSerializerView,
+            ThreadLocal<DataInputDeserializer> valueDeserializerView) {
+        super(stateRequestHandler, valueStateDescriptor);
+        this.columnFamilyHandle = columnFamily;
+        this.serializedKeyBuilder = serializedKeyBuilder;
+        this.valueSerializerView = valueSerializerView;
+        this.valueDeserializerView = valueDeserializerView;
+    }
+
+    @Override
+    public ColumnFamilyHandle getColumnFamilyHandle() {
+        return columnFamilyHandle;
+    }
+
+    @Override
+    public byte[] serializeKey(ContextKey<K> contextKey) throws IOException {

Review Comment:
   Should `ContextKey` be thread-safe? It seems the 'read cache or serialize' 
logic better be called only once for each context key.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMultiGetOperation.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The native multiGet operation implementation for ForStDB.
+ *
+ * @param <K> The type of key in get access request.
+ * @param <V> The type of value in get access request.
+ */
+public class ForStNativeMultiGetOperation<K, V> implements 
ForStDBOperation<List<V>> {

Review Comment:
   We can introduce this later đź‘€  and may be excluded from this PR?



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import javax.annotation.Nullable;
+
+/**
+ * The composite key which contains some context information.
+ *
+ * @param <K> The type of the raw key.
+ */
+public class ContextKey<K> {
+
+    private final K rawKey;
+
+    private final int keyGroup;
+
+    /**
+     * A record in user layer may access the state multiple times. The {@code 
serializedKey} can be
+     * used to cache the serialized key bytes after its first serialization, 
so that subsequent
+     * state accesses with the same key can avoid being serialized repeatedly.
+     */
+    private @Nullable byte[] serializedKey = null;
+
+    public ContextKey(K rawKey, int keyGroup) {

Review Comment:
   Since you have provided the `of` static method, the constructor could be 
private? Well I'd prefer not providing `of` if you have multiple 
constructors....



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/** The handler which can process {@link StateRequest}. */
+public interface StateRequestHandler {
+
+    /**
+     * Submit a {@link StateRequest} to this StateRequestHandler and trigger 
it if needed.

Review Comment:
   This statement of 'trigger if needed' is not suitable for 
`StateRequestHandler`. I'd suggest keep the original one in `AEC`, and adjust 
this description.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import javax.annotation.Nullable;
+
+/**
+ * The composite key which contains some context information.
+ *
+ * @param <K> The type of the raw key.
+ */
+public class ContextKey<K> {

Review Comment:
   And IIUC the `ContextKey` will be shared across state requests? Is that 
means `ContextKey` should be attached to the `RecordContext` in future?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * The handler which can process {@link StateRequest}.
+ */
+public interface StateRequestHandler {

Review Comment:
   This better be annotated as `@Internal`



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBOperation.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * Data access operation to ForStDB. This interface is used to encapsulate the 
DB access operations
+ * formed after grouping state access. For more information about “Grouping 
state access”, please
+ * refer to FLIP-426.
+ *
+ * @param <OUT> The type of output for DB access.
+ */
+@Internal
+public interface ForStDBOperation<OUT> {
+
+    /**
+     * Process the ForStDB access requests.
+     *
+     * @return Processing result.
+     * @throws IOException Thrown if ForStDB access encountered an I/O related 
error.
+     */
+    OUT process() throws IOException;

Review Comment:
   Is this designed to be a synchronous method? Would it be better if it is an 
asynchronous method returning a `CompletableFuture`? I think this is more 
flexible since we don't need a specific thread waiting the result.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class ForStValueState<K, V> extends InternalValueState<K, V>
+        implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> {
+
+    /** The column family which this internal value state belongs to. */
+    private final ColumnFamilyHandle columnFamilyHandle;
+
+    /** The serialized key builder which need be thread-safe. */

Review Comment:
   ```suggestion
       /** The serialized key builder which should be thread-safe. */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to