This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e05701029d533c169e4d572914ad1b4ba69ca051
Author: Jinzhong Li <lijinzhong2...@gmail.com>
AuthorDate: Thu Apr 18 16:29:28 2024 +0800

    [FLINK-35125][state] Implement ValueState for ForStStateBackend
---
 .../asyncprocessing/AsyncExecutionController.java  |   5 +-
 .../flink/runtime/asyncprocessing/ContextKey.java  | 108 +++++++++++++++++++++
 .../asyncprocessing/StateRequestHandler.java       |  42 ++++++++
 .../flink/runtime/state/v2/InternalKeyedState.java |  10 +-
 .../flink/runtime/state/v2/InternalValueState.java |   6 +-
 .../apache/flink/state/forst/ForStInnerTable.java  |  66 +++++++++++++
 .../apache/flink/state/forst/ForStValueState.java  |  99 +++++++++++++++++++
 .../state/forst/ForStDBOperationTestBase.java      | 105 ++++++++++++++++++++
 8 files changed, 431 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index d0291aea79c..1744a8c36b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * @param <K> the type of the key
  */
-public class AsyncExecutionController<K> {
+public class AsyncExecutionController<K> implements StateRequestHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
@@ -176,7 +176,7 @@ public class AsyncExecutionController<K> {
     }
 
     /**
-     * Submit a {@link StateRequest} to this AEC and trigger if needed.
+     * Submit a {@link StateRequest} to this AsyncExecutionController and 
trigger it if needed.
      *
      * @param state the state to request. Could be {@code null} if the type is 
{@link
      *     StateRequestType#SYNC_POINT}.
@@ -184,6 +184,7 @@ public class AsyncExecutionController<K> {
      * @param payload the payload input for this request.
      * @return the state future.
      */
+    @Override
     public <IN, OUT> InternalStateFuture<OUT> handleRequest(
             @Nullable State state, StateRequestType type, @Nullable IN 
payload) {
         // Step 1: build state future & assign context.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextKey.java
new file mode 100644
index 00000000000..ed7e31ea90e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextKey.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The composite key which contains some context information, such as 
keyGroup, etc.
+ *
+ * @param <K> The type of the raw key.
+ */
+@ThreadSafe
+public class ContextKey<K> {
+
+    private final K rawKey;
+
+    private final int keyGroup;
+
+    /**
+     * A record in user layer may access the state multiple times. The {@code 
serializedKey} can be
+     * used to cache the serialized key bytes after its first serialization, 
so that subsequent
+     * state accesses with the same key can avoid being serialized repeatedly.
+     */
+    private @Nullable volatile byte[] serializedKey = null;
+
+    public ContextKey(K rawKey, int keyGroup) {
+        this.rawKey = rawKey;
+        this.keyGroup = keyGroup;
+        this.serializedKey = null;
+    }
+
+    public ContextKey(K rawKey, int keyGroup, byte[] serializedKey) {
+        this.rawKey = rawKey;
+        this.keyGroup = keyGroup;
+        this.serializedKey = serializedKey;
+    }
+
+    public K getRawKey() {
+        return rawKey;
+    }
+
+    public int getKeyGroup() {
+        return keyGroup;
+    }
+
+    /**
+     * Get the serialized key. If the cached serialized key is null, the 
provided serialization
+     * function will be called, and the serialization result will be cached by 
ContextKey.
+     *
+     * @param serializeKeyFunc the provided serialization function for this 
contextKey.
+     * @return the serialized bytes.
+     */
+    public byte[] getOrCreateSerializedKey(
+            FunctionWithException<ContextKey<K>, byte[], IOException> 
serializeKeyFunc)
+            throws IOException {
+        if (serializedKey != null) {
+            return serializedKey;
+        }
+        synchronized (this) {
+            if (serializedKey == null) {
+                this.serializedKey = serializeKeyFunc.apply(this);
+            }
+        }
+        return serializedKey;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rawKey, keyGroup);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ContextKey<?> that = (ContextKey<?>) o;
+        if (!Objects.equals(rawKey, that.rawKey)) {
+            return false;
+        }
+        return Objects.equals(keyGroup, that.keyGroup);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
new file mode 100644
index 00000000000..1288ad8a01b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/** The handler which can process {@link StateRequest}. */
+@Internal
+public interface StateRequestHandler {
+
+    /**
+     * Submit a {@link StateRequest} to this StateRequestHandler.
+     *
+     * @param state the state to request. Could be {@code null} if the type is 
{@link
+     *     StateRequestType#SYNC_POINT}.
+     * @param type the type of this request.
+     * @param payload the payload input for this request.
+     * @return the state future.
+     */
+    <IN, OUT> InternalStateFuture<OUT> handleRequest(
+            @Nullable State state, StateRequestType type, @Nullable IN 
payload);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
index b75c91676cb..2b1be2a6764 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 
 /**
@@ -38,7 +39,7 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestType;
 @Internal
 public abstract class InternalKeyedState<K, V> implements State {
 
-    private final AsyncExecutionController<K> asyncExecutionController;
+    private final StateRequestHandler stateRequestHandler;
 
     private final StateDescriptor<V> stateDescriptor;
 
@@ -46,9 +47,8 @@ public abstract class InternalKeyedState<K, V> implements 
State {
      * Creates a new InternalKeyedState with the given 
asyncExecutionController and stateDescriptor.
      */
     public InternalKeyedState(
-            AsyncExecutionController<K> asyncExecutionController,
-            StateDescriptor<V> stateDescriptor) {
-        this.asyncExecutionController = asyncExecutionController;
+            StateRequestHandler stateRequestHandler, StateDescriptor<V> 
stateDescriptor) {
+        this.stateRequestHandler = stateRequestHandler;
         this.stateDescriptor = stateDescriptor;
     }
 
@@ -61,7 +61,7 @@ public abstract class InternalKeyedState<K, V> implements 
State {
      */
     protected final <IN, OUT> StateFuture<OUT> handleRequest(
             StateRequestType stateRequestType, IN payload) {
-        return asyncExecutionController.handleRequest(this, stateRequestType, 
payload);
+        return stateRequestHandler.handleRequest(this, stateRequestType, 
payload);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
index 72122bf4cf2..2a9c643d17a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.v2;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 
 /**
@@ -32,9 +33,8 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestType;
 public class InternalValueState<K, V> extends InternalKeyedState<K, V> 
implements ValueState<V> {
 
     public InternalValueState(
-            AsyncExecutionController<K> asyncExecutionController,
-            ValueStateDescriptor<V> valueStateDescriptor) {
-        super(asyncExecutionController, valueStateDescriptor);
+            StateRequestHandler stateRequestHandler, ValueStateDescriptor<V> 
valueStateDescriptor) {
+        super(stateRequestHandler, valueStateDescriptor);
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
new file mode 100644
index 00000000000..8a3ac9172ad
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The concept of an abstracted table oriented towards ForStDB, and each 
ForStInnerTable can be
+ * mapped to a ForSt internal State.
+ *
+ * <p>The mapping between ForStInnerTable and ForStDB's columnFamily can be 
one-to-one or
+ * many-to-one.
+ *
+ * @param <K> The key type of the table.
+ * @param <V> The value type of the table.
+ */
+public interface ForStInnerTable<K, V> {
+
+    /** Get the columnFamily handle corresponding to table. */
+    ColumnFamilyHandle getColumnFamilyHandle();
+
+    /**
+     * Serialize the given key to bytes.
+     *
+     * @param key the key to be serialized.
+     * @return the key bytes
+     * @throws IOException Thrown if the serialization encountered an I/O 
related error.
+     */
+    byte[] serializeKey(K key) throws IOException;
+
+    /**
+     * Serialize the given value to the outputView.
+     *
+     * @param value the value to be serialized.
+     * @return the value bytes
+     * @throws IOException Thrown if the serialization encountered an I/O 
related error.
+     */
+    byte[] serializeValue(V value) throws IOException;
+
+    /**
+     * Deserialize the given bytes value to POJO value.
+     *
+     * @param value the value bytes to be deserialized.
+     * @return the deserialized POJO value
+     * @throws IOException Thrown if the deserialization encountered an I/O 
related error.
+     */
+    V deserializeValue(byte[] value) throws IOException;
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
new file mode 100644
index 00000000000..910b75eab33
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.ContextKey;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class ForStValueState<K, V> extends InternalValueState<K, V>
+        implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> {
+
+    /** The column family which this internal value state belongs to. */
+    private final ColumnFamilyHandle columnFamilyHandle;
+
+    /** The serialized key builder which should be thread-safe. */
+    private final ThreadLocal<SerializedCompositeKeyBuilder<K>> 
serializedKeyBuilder;
+
+    /** The data outputStream used for value serializer, which should be 
thread-safe. */
+    private final ThreadLocal<DataOutputSerializer> valueSerializerView;
+
+    /** The data inputStream used for value deserializer, which should be 
thread-safe. */
+    private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
+
+    public ForStValueState(
+            StateRequestHandler stateRequestHandler,
+            ColumnFamilyHandle columnFamily,
+            ValueStateDescriptor<V> valueStateDescriptor,
+            Supplier<SerializedCompositeKeyBuilder<K>> 
serializedKeyBuilderInitializer,
+            Supplier<DataOutputSerializer> valueSerializerViewInitializer,
+            Supplier<DataInputDeserializer> valueDeserializerViewInitializer) {
+        super(stateRequestHandler, valueStateDescriptor);
+        this.columnFamilyHandle = columnFamily;
+        this.serializedKeyBuilder = 
ThreadLocal.withInitial(serializedKeyBuilderInitializer);
+        this.valueSerializerView = 
ThreadLocal.withInitial(valueSerializerViewInitializer);
+        this.valueDeserializerView = 
ThreadLocal.withInitial(valueDeserializerViewInitializer);
+    }
+
+    @Override
+    public ColumnFamilyHandle getColumnFamilyHandle() {
+        return columnFamilyHandle;
+    }
+
+    @Override
+    public byte[] serializeKey(ContextKey<K> contextKey) throws IOException {
+        return contextKey.getOrCreateSerializedKey(
+                ctxKey -> {
+                    SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
+                    builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
+                    return builder.build();
+                });
+    }
+
+    @Override
+    public byte[] serializeValue(V value) throws IOException {
+        DataOutputSerializer outputView = valueSerializerView.get();
+        outputView.clear();
+        getValueSerializer().serialize(value, outputView);
+        return outputView.getCopyOfBuffer();
+    }
+
+    @Override
+    public V deserializeValue(byte[] valueBytes) throws IOException {
+        DataInputDeserializer inputView = valueDeserializerView.get();
+        inputView.setBuffer(valueBytes);
+        return getValueSerializer().deserialize(inputView);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
new file mode 100644
index 00000000000..f70c50a4782
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.runtime.asyncprocessing.ContextKey;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.RocksDB;
+
+import java.nio.file.Path;
+import java.util.function.Supplier;
+
+/** Base class for {@link ForStDBOperation} tests. */
+public class ForStDBOperationTestBase {
+    @TempDir private Path tmpDbDir;
+    protected RocksDB db;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        db = RocksDB.open(tmpDbDir.toAbsolutePath().toString());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (db != null) {
+            db.close();
+        }
+    }
+
+    protected ColumnFamilyHandle createColumnFamilyHandle(String 
columnFamilyName)
+            throws Exception {
+        byte[] nameBytes = 
columnFamilyName.getBytes(ConfigConstants.DEFAULT_CHARSET);
+        ColumnFamilyDescriptor columnFamilyDescriptor =
+                new ColumnFamilyDescriptor(nameBytes, new 
ColumnFamilyOptions());
+        return db.createColumnFamily(columnFamilyDescriptor);
+    }
+
+    StateRequestHandler buildMockStateRequestHandler() {
+
+        return new StateRequestHandler() {
+            @Override
+            public <IN, OUT> InternalStateFuture<OUT> handleRequest(
+                    @Nullable State state, StateRequestType type, @Nullable IN 
payload) {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    protected ContextKey<Integer> buildContextKey(int i) {
+        return new ContextKey<>(i, KeyGroupRangeAssignment.assignToKeyGroup(i, 
128));
+    }
+
+    protected ForStValueState<Integer, String> buildForStValueState(String 
stateName)
+            throws Exception {
+        ColumnFamilyHandle cf = createColumnFamilyHandle(stateName);
+        ValueStateDescriptor<String> valueStateDescriptor =
+                new ValueStateDescriptor<>(stateName, 
BasicTypeInfo.STRING_TYPE_INFO);
+        Supplier<SerializedCompositeKeyBuilder<Integer>> serializedKeyBuilder =
+                () -> new 
SerializedCompositeKeyBuilder<>(IntSerializer.INSTANCE, 2, 32);
+        Supplier<DataOutputSerializer> valueSerializerView = () -> new 
DataOutputSerializer(32);
+        Supplier<DataInputDeserializer> valueDeserializerView =
+                () -> new DataInputDeserializer(new byte[128]);
+        return new ForStValueState<>(
+                buildMockStateRequestHandler(),
+                cf,
+                valueStateDescriptor,
+                serializedKeyBuilder,
+                valueSerializerView,
+                valueDeserializerView);
+    }
+}

Reply via email to