This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aae75bfe3cc16c56aa3a35b21b3698cbf7d7d209 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Thu Jan 28 14:04:06 2021 +0100 [refactor] Extract common logic for serializing delimited list --- .../runtime/state/ListDelimitedSerializer.java | 93 ++++++++++++++++++++++ .../streaming/state/AbstractRocksDBState.java | 22 ----- .../contrib/streaming/state/RocksDBListState.java | 81 +++++-------------- 3 files changed, 111 insertions(+), 85 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java new file mode 100644 index 0000000..ddfb916 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Encapsulates a logic of serialization and deserialization of a list with a delimiter. Used in the + * savepoint format. + */ +public final class ListDelimitedSerializer { + + private static final byte DELIMITER = ','; + + private final DataInputDeserializer dataInputView = new DataInputDeserializer(); + private final DataOutputSerializer dataOutputView = new DataOutputSerializer(128); + + public <T> List<T> deserializeList(byte[] valueBytes, TypeSerializer<T> elementSerializer) { + if (valueBytes == null) { + return null; + } + + dataInputView.setBuffer(valueBytes); + + List<T> result = new ArrayList<>(); + T next; + while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) { + result.add(next); + } + return result; + } + + public <T> byte[] serializeList(List<T> valueList, TypeSerializer<T> elementSerializer) + throws IOException { + + dataOutputView.clear(); + boolean first = true; + + for (T value : valueList) { + Preconditions.checkNotNull(value, "You cannot add null to a value list."); + + if (first) { + first = false; + } else { + dataOutputView.write(DELIMITER); + } + elementSerializer.serialize(value, dataOutputView); + } + + return dataOutputView.getCopyOfBuffer(); + } + + /** Deserializes a single element from a serialized list. */ + public static <T> T deserializeNextElement( + DataInputDeserializer in, TypeSerializer<T> elementSerializer) { + try { + if (in.available() > 0) { + T element = elementSerializer.deserialize(in); + if (in.available() > 0) { + in.readByte(); + } + return element; + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unexpected list element deserialization failure", e); + } + return null; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 621e93a..c9a07cf 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -34,7 +34,6 @@ import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.IOException; -import java.util.List; /** * Base class for {@link State} implementations that store state in a RocksDB database. @@ -180,27 +179,6 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K return serializeValueInternal(value, serializer); } - <T> byte[] serializeValueList( - List<T> valueList, TypeSerializer<T> elementSerializer, byte delimiter) - throws IOException { - - dataOutputView.clear(); - boolean first = true; - - for (T value : valueList) { - Preconditions.checkNotNull(value, "You cannot add null to a value list."); - - if (first) { - first = false; - } else { - dataOutputView.write(delimiter); - } - elementSerializer.serialize(value, dataOutputView); - } - - return dataOutputView.getCopyOfBuffer(); - } - public void migrateSerializedValue( DataInputDeserializer serializedOldValueInput, DataOutputSerializer serializedMigratedValueOutput, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index 7e8353a..8bcecc7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.ListDelimitedSerializer; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalListState; @@ -63,6 +64,8 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> /** Serializer for the values. */ private final TypeSerializer<V> elementSerializer; + private final ListDelimitedSerializer listSerializer; + /** Separator of StringAppendTestOperator in RocksDB. */ private static final byte DELIMITER = ','; @@ -86,6 +89,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> ListSerializer<V> castedListSerializer = (ListSerializer<V>) valueSerializer; this.elementSerializer = castedListSerializer.getElementSerializer(); + this.listSerializer = new ListDelimitedSerializer(); } @Override @@ -113,43 +117,12 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> try { byte[] key = serializeCurrentKeyWithGroupAndNamespace(); byte[] valueBytes = backend.db.get(columnFamily, key); - return deserializeList(valueBytes); + return listSerializer.deserializeList(valueBytes, elementSerializer); } catch (RocksDBException e) { throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e); } } - private List<V> deserializeList(byte[] valueBytes) { - if (valueBytes == null) { - return null; - } - - dataInputView.setBuffer(valueBytes); - - List<V> result = new ArrayList<>(); - V next; - while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) { - result.add(next); - } - return result; - } - - private static <V> V deserializeNextElement( - DataInputDeserializer in, TypeSerializer<V> elementSerializer) { - try { - if (in.available() > 0) { - V element = elementSerializer.deserialize(in); - if (in.available() > 0) { - in.readByte(); - } - return element; - } - } catch (IOException e) { - throw new FlinkRuntimeException("Unexpected list element deserialization failure", e); - } - return null; - } - @Override public void add(V value) { Preconditions.checkNotNull(value, "You cannot add null to a ListState."); @@ -210,7 +183,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace(), - serializeValueList(values, elementSerializer, DELIMITER)); + listSerializer.serializeList(values, elementSerializer)); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while updating data to RocksDB", e); } @@ -229,7 +202,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace(), - serializeValueList(values, elementSerializer, DELIMITER)); + listSerializer.serializeList(values, elementSerializer)); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while updating data to RocksDB", e); } @@ -255,7 +228,9 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> try { while (serializedOldValueInput.available() > 0) { - V element = deserializeNextElement(serializedOldValueInput, priorElementSerializer); + V element = + ListDelimitedSerializer.deserializeNextElement( + serializedOldValueInput, priorElementSerializer); newElementSerializer.serialize(element, serializedMigratedValueOutput); if (serializedOldValueInput.available() > 0) { serializedMigratedValueOutput.write(DELIMITER); @@ -285,17 +260,19 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> { private final StateSnapshotTransformer<T> elementTransformer; private final TypeSerializer<T> elementSerializer; - private final DataOutputSerializer out = new DataOutputSerializer(128); private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy; + private final ListDelimitedSerializer listSerializer; + private final DataInputDeserializer in = new DataInputDeserializer(); StateSnapshotTransformerWrapper( StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) { this.elementTransformer = elementTransformer; this.elementSerializer = elementSerializer; + this.listSerializer = new ListDelimitedSerializer(); this.transformStrategy = elementTransformer instanceof CollectionStateSnapshotTransformer - ? ((CollectionStateSnapshotTransformer) elementTransformer) + ? ((CollectionStateSnapshotTransformer<?>) elementTransformer) .getFilterStrategy() : CollectionStateSnapshotTransformer.TransformStrategy.TRANSFORM_ALL; } @@ -307,10 +284,11 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> return null; } List<T> result = new ArrayList<>(); - DataInputDeserializer in = new DataInputDeserializer(value); + in.setBuffer(value); T next; int prevPosition = 0; - while ((next = deserializeNextElement(in, elementSerializer)) != null) { + while ((next = ListDelimitedSerializer.deserializeNextElement(in, elementSerializer)) + != null) { T transformedElement = elementTransformer.filterOrTransform(next); if (transformedElement != null) { if (transformStrategy == STOP_ON_FIRST_INCLUDED) { @@ -324,33 +302,10 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> try { return result.isEmpty() ? null - : serializeValueList(result, elementSerializer, DELIMITER); + : listSerializer.serializeList(result, elementSerializer); } catch (IOException e) { throw new FlinkRuntimeException("Failed to serialize transformed list", e); } } - - byte[] serializeValueList( - List<T> valueList, - TypeSerializer<T> elementSerializer, - @SuppressWarnings("SameParameterValue") byte delimiter) - throws IOException { - - out.clear(); - boolean first = true; - - for (T value : valueList) { - Preconditions.checkNotNull(value, "You cannot add null to a value list."); - - if (first) { - first = false; - } else { - out.write(delimiter); - } - elementSerializer.serialize(value, out); - } - - return out.getCopyOfBuffer(); - } } }