This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e05701029d533c169e4d572914ad1b4ba69ca051 Author: Jinzhong Li <lijinzhong2...@gmail.com> AuthorDate: Thu Apr 18 16:29:28 2024 +0800 [FLINK-35125][state] Implement ValueState for ForStStateBackend --- .../asyncprocessing/AsyncExecutionController.java | 5 +- .../flink/runtime/asyncprocessing/ContextKey.java | 108 +++++++++++++++++++++ .../asyncprocessing/StateRequestHandler.java | 42 ++++++++ .../flink/runtime/state/v2/InternalKeyedState.java | 10 +- .../flink/runtime/state/v2/InternalValueState.java | 6 +- .../apache/flink/state/forst/ForStInnerTable.java | 66 +++++++++++++ .../apache/flink/state/forst/ForStValueState.java | 99 +++++++++++++++++++ .../state/forst/ForStDBOperationTestBase.java | 105 ++++++++++++++++++++ 8 files changed, 431 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index d0291aea79c..1744a8c36b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; * * @param <K> the type of the key */ -public class AsyncExecutionController<K> { +public class AsyncExecutionController<K> implements StateRequestHandler { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); @@ -176,7 +176,7 @@ public class AsyncExecutionController<K> { } /** - * Submit a {@link StateRequest} to this AEC and trigger if needed. + * Submit a {@link StateRequest} to this AsyncExecutionController and trigger it if needed. * * @param state the state to request. Could be {@code null} if the type is {@link * StateRequestType#SYNC_POINT}. @@ -184,6 +184,7 @@ public class AsyncExecutionController<K> { * @param payload the payload input for this request. * @return the state future. */ + @Override public <IN, OUT> InternalStateFuture<OUT> handleRequest( @Nullable State state, StateRequestType type, @Nullable IN payload) { // Step 1: build state future & assign context. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextKey.java new file mode 100644 index 00000000000..ed7e31ea90e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextKey.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.util.function.FunctionWithException; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.Objects; + +/** + * The composite key which contains some context information, such as keyGroup, etc. + * + * @param <K> The type of the raw key. + */ +@ThreadSafe +public class ContextKey<K> { + + private final K rawKey; + + private final int keyGroup; + + /** + * A record in user layer may access the state multiple times. The {@code serializedKey} can be + * used to cache the serialized key bytes after its first serialization, so that subsequent + * state accesses with the same key can avoid being serialized repeatedly. + */ + private @Nullable volatile byte[] serializedKey = null; + + public ContextKey(K rawKey, int keyGroup) { + this.rawKey = rawKey; + this.keyGroup = keyGroup; + this.serializedKey = null; + } + + public ContextKey(K rawKey, int keyGroup, byte[] serializedKey) { + this.rawKey = rawKey; + this.keyGroup = keyGroup; + this.serializedKey = serializedKey; + } + + public K getRawKey() { + return rawKey; + } + + public int getKeyGroup() { + return keyGroup; + } + + /** + * Get the serialized key. If the cached serialized key is null, the provided serialization + * function will be called, and the serialization result will be cached by ContextKey. + * + * @param serializeKeyFunc the provided serialization function for this contextKey. + * @return the serialized bytes. + */ + public byte[] getOrCreateSerializedKey( + FunctionWithException<ContextKey<K>, byte[], IOException> serializeKeyFunc) + throws IOException { + if (serializedKey != null) { + return serializedKey; + } + synchronized (this) { + if (serializedKey == null) { + this.serializedKey = serializeKeyFunc.apply(this); + } + } + return serializedKey; + } + + @Override + public int hashCode() { + return Objects.hash(rawKey, keyGroup); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ContextKey<?> that = (ContextKey<?>) o; + if (!Objects.equals(rawKey, that.rawKey)) { + return false; + } + return Objects.equals(keyGroup, that.keyGroup); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java new file mode 100644 index 00000000000..1288ad8a01b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** The handler which can process {@link StateRequest}. */ +@Internal +public interface StateRequestHandler { + + /** + * Submit a {@link StateRequest} to this StateRequestHandler. + * + * @param state the state to request. Could be {@code null} if the type is {@link + * StateRequestType#SYNC_POINT}. + * @param type the type of this request. + * @param payload the payload input for this request. + * @return the state future. + */ + <IN, OUT> InternalStateFuture<OUT> handleRequest( + @Nullable State state, StateRequestType type, @Nullable IN payload); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java index b75c91676cb..2b1be2a6764 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; /** @@ -38,7 +39,7 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestType; @Internal public abstract class InternalKeyedState<K, V> implements State { - private final AsyncExecutionController<K> asyncExecutionController; + private final StateRequestHandler stateRequestHandler; private final StateDescriptor<V> stateDescriptor; @@ -46,9 +47,8 @@ public abstract class InternalKeyedState<K, V> implements State { * Creates a new InternalKeyedState with the given asyncExecutionController and stateDescriptor. */ public InternalKeyedState( - AsyncExecutionController<K> asyncExecutionController, - StateDescriptor<V> stateDescriptor) { - this.asyncExecutionController = asyncExecutionController; + StateRequestHandler stateRequestHandler, StateDescriptor<V> stateDescriptor) { + this.stateRequestHandler = stateRequestHandler; this.stateDescriptor = stateDescriptor; } @@ -61,7 +61,7 @@ public abstract class InternalKeyedState<K, V> implements State { */ protected final <IN, OUT> StateFuture<OUT> handleRequest( StateRequestType stateRequestType, IN payload) { - return asyncExecutionController.handleRequest(this, stateRequestType, payload); + return stateRequestHandler.handleRequest(this, stateRequestType, payload); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java index 72122bf4cf2..2a9c643d17a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; /** @@ -32,9 +33,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestType; public class InternalValueState<K, V> extends InternalKeyedState<K, V> implements ValueState<V> { public InternalValueState( - AsyncExecutionController<K> asyncExecutionController, - ValueStateDescriptor<V> valueStateDescriptor) { - super(asyncExecutionController, valueStateDescriptor); + StateRequestHandler stateRequestHandler, ValueStateDescriptor<V> valueStateDescriptor) { + super(stateRequestHandler, valueStateDescriptor); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java new file mode 100644 index 00000000000..8a3ac9172ad --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.rocksdb.ColumnFamilyHandle; + +import java.io.IOException; + +/** + * The concept of an abstracted table oriented towards ForStDB, and each ForStInnerTable can be + * mapped to a ForSt internal State. + * + * <p>The mapping between ForStInnerTable and ForStDB's columnFamily can be one-to-one or + * many-to-one. + * + * @param <K> The key type of the table. + * @param <V> The value type of the table. + */ +public interface ForStInnerTable<K, V> { + + /** Get the columnFamily handle corresponding to table. */ + ColumnFamilyHandle getColumnFamilyHandle(); + + /** + * Serialize the given key to bytes. + * + * @param key the key to be serialized. + * @return the key bytes + * @throws IOException Thrown if the serialization encountered an I/O related error. + */ + byte[] serializeKey(K key) throws IOException; + + /** + * Serialize the given value to the outputView. + * + * @param value the value to be serialized. + * @return the value bytes + * @throws IOException Thrown if the serialization encountered an I/O related error. + */ + byte[] serializeValue(V value) throws IOException; + + /** + * Deserialize the given bytes value to POJO value. + * + * @param value the value bytes to be deserialized. + * @return the deserialized POJO value + * @throws IOException Thrown if the deserialization encountered an I/O related error. + */ + V deserializeValue(byte[] value) throws IOException; +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java new file mode 100644 index 00000000000..910b75eab33 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.asyncprocessing.ContextKey; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.v2.InternalValueState; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; + +import org.rocksdb.ColumnFamilyHandle; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * The {@link InternalValueState} implement for ForStDB. + * + * @param <K> The type of the key. + * @param <V> The type of the value. + */ +public class ForStValueState<K, V> extends InternalValueState<K, V> + implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> { + + /** The column family which this internal value state belongs to. */ + private final ColumnFamilyHandle columnFamilyHandle; + + /** The serialized key builder which should be thread-safe. */ + private final ThreadLocal<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder; + + /** The data outputStream used for value serializer, which should be thread-safe. */ + private final ThreadLocal<DataOutputSerializer> valueSerializerView; + + /** The data inputStream used for value deserializer, which should be thread-safe. */ + private final ThreadLocal<DataInputDeserializer> valueDeserializerView; + + public ForStValueState( + StateRequestHandler stateRequestHandler, + ColumnFamilyHandle columnFamily, + ValueStateDescriptor<V> valueStateDescriptor, + Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilderInitializer, + Supplier<DataOutputSerializer> valueSerializerViewInitializer, + Supplier<DataInputDeserializer> valueDeserializerViewInitializer) { + super(stateRequestHandler, valueStateDescriptor); + this.columnFamilyHandle = columnFamily; + this.serializedKeyBuilder = ThreadLocal.withInitial(serializedKeyBuilderInitializer); + this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer); + this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer); + } + + @Override + public ColumnFamilyHandle getColumnFamilyHandle() { + return columnFamilyHandle; + } + + @Override + public byte[] serializeKey(ContextKey<K> contextKey) throws IOException { + return contextKey.getOrCreateSerializedKey( + ctxKey -> { + SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get(); + builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); + return builder.build(); + }); + } + + @Override + public byte[] serializeValue(V value) throws IOException { + DataOutputSerializer outputView = valueSerializerView.get(); + outputView.clear(); + getValueSerializer().serialize(value, outputView); + return outputView.getCopyOfBuffer(); + } + + @Override + public V deserializeValue(byte[] valueBytes) throws IOException { + DataInputDeserializer inputView = valueDeserializerView.get(); + inputView.setBuffer(valueBytes); + return getValueSerializer().deserialize(inputView); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java new file mode 100644 index 00000000000..f70c50a4782 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.runtime.asyncprocessing.ContextKey; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; + +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.RocksDB; + +import java.nio.file.Path; +import java.util.function.Supplier; + +/** Base class for {@link ForStDBOperation} tests. */ +public class ForStDBOperationTestBase { + @TempDir private Path tmpDbDir; + protected RocksDB db; + + @BeforeEach + public void setUp() throws Exception { + db = RocksDB.open(tmpDbDir.toAbsolutePath().toString()); + } + + @AfterEach + public void tearDown() { + if (db != null) { + db.close(); + } + } + + protected ColumnFamilyHandle createColumnFamilyHandle(String columnFamilyName) + throws Exception { + byte[] nameBytes = columnFamilyName.getBytes(ConfigConstants.DEFAULT_CHARSET); + ColumnFamilyDescriptor columnFamilyDescriptor = + new ColumnFamilyDescriptor(nameBytes, new ColumnFamilyOptions()); + return db.createColumnFamily(columnFamilyDescriptor); + } + + StateRequestHandler buildMockStateRequestHandler() { + + return new StateRequestHandler() { + @Override + public <IN, OUT> InternalStateFuture<OUT> handleRequest( + @Nullable State state, StateRequestType type, @Nullable IN payload) { + throw new UnsupportedOperationException(); + } + }; + } + + protected ContextKey<Integer> buildContextKey(int i) { + return new ContextKey<>(i, KeyGroupRangeAssignment.assignToKeyGroup(i, 128)); + } + + protected ForStValueState<Integer, String> buildForStValueState(String stateName) + throws Exception { + ColumnFamilyHandle cf = createColumnFamilyHandle(stateName); + ValueStateDescriptor<String> valueStateDescriptor = + new ValueStateDescriptor<>(stateName, BasicTypeInfo.STRING_TYPE_INFO); + Supplier<SerializedCompositeKeyBuilder<Integer>> serializedKeyBuilder = + () -> new SerializedCompositeKeyBuilder<>(IntSerializer.INSTANCE, 2, 32); + Supplier<DataOutputSerializer> valueSerializerView = () -> new DataOutputSerializer(32); + Supplier<DataInputDeserializer> valueDeserializerView = + () -> new DataInputDeserializer(new byte[128]); + return new ForStValueState<>( + buildMockStateRequestHandler(), + cf, + valueStateDescriptor, + serializedKeyBuilder, + valueSerializerView, + valueDeserializerView); + } +}