This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a3436cb67866fddec45c9ac1bd760c24732ca32b Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Thu Jan 28 14:41:53 2021 +0100 [refactor] Move RocksDBCompositeKeyBuilder to a common package --- .../state/CompositeKeySerializationUtils.java | 7 +- .../state/SerializedCompositeKeyBuilder.java | 79 ++++++++++--- .../state/CompositeKeySerializationUtilsTest.java | 32 +++--- .../state/SerializedCompositeKeyBuilderTest.java | 128 +++++++++++++++------ .../streaming/state/AbstractRocksDBState.java | 7 +- .../state/RocksDBCachingPriorityQueueSet.java | 3 +- .../state/RocksDBIncrementalCheckpointUtils.java | 9 +- .../streaming/state/RocksDBKeyedStateBackend.java | 14 ++- .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../contrib/streaming/state/RocksDBMapState.java | 5 +- .../iterator/AbstractRocksStateKeysIterator.java | 4 +- .../RocksStateKeysAndNamespaceIterator.java | 4 +- .../RocksDBIncrementalRestoreOperation.java | 6 +- ...rtitionedPriorityQueueWithRocksDBStoreTest.java | 4 +- .../RocksDBIncrementalCheckpointUtilsTest.java | 16 ++- ...sDBRocksStateKeysAndNamespacesIteratorTest.java | 5 +- .../state/RocksDBRocksStateKeysIteratorTest.java | 5 +- 17 files changed, 227 insertions(+), 109 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java similarity index 96% rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java index 95c7e0b..8088c19 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.contrib.streaming.state; +package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputDeserializer; @@ -28,9 +28,10 @@ import javax.annotation.Nonnull; import java.io.IOException; /** Utils for RocksDB state serialization and deserialization. */ -public class RocksDBKeySerializationUtils { +public class CompositeKeySerializationUtils { - static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws IOException { + public static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) + throws IOException { int keyGroup = 0; for (int i = 0; i < keyGroupPrefixBytes; ++i) { keyGroup <<= 8; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java similarity index 72% rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java index 78397ac..382bf04 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.contrib.streaming.state; +package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; @@ -38,7 +38,7 @@ import java.io.IOException; */ @NotThreadSafe @Internal -class RocksDBSerializedCompositeKeyBuilder<K> { +public final class SerializedCompositeKeyBuilder<K> { /** The serializer for the key. */ @Nonnull private final TypeSerializer<K> keySerializer; @@ -55,7 +55,9 @@ class RocksDBSerializedCompositeKeyBuilder<K> { /** Mark for the position after the serialized key. */ @Nonnegative private int afterKeyMark; - public RocksDBSerializedCompositeKeyBuilder( + @Nonnegative private int afterNamespaceMark; + + public SerializedCompositeKeyBuilder( @Nonnull TypeSerializer<K> keySerializer, @Nonnegative int keyGroupPrefixBytes, @Nonnegative int initialSize) { @@ -63,12 +65,12 @@ class RocksDBSerializedCompositeKeyBuilder<K> { keySerializer, new DataOutputSerializer(initialSize), keyGroupPrefixBytes, - RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer), + CompositeKeySerializationUtils.isSerializerTypeVariableSized(keySerializer), 0); } @VisibleForTesting - RocksDBSerializedCompositeKeyBuilder( + SerializedCompositeKeyBuilder( @Nonnull TypeSerializer<K> keySerializer, @Nonnull DataOutputSerializer keyOutView, @Nonnegative int keyGroupPrefixBytes, @@ -96,6 +98,15 @@ class RocksDBSerializedCompositeKeyBuilder<K> { } } + public <N> void setNamespace( + @Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) { + try { + serializeNamespace(namespace, namespaceSerializer); + } catch (IOException shouldNeverHappen) { + throw new FlinkRuntimeException(shouldNeverHappen); + } + } + /** * Returns a serialized composite key, from the key and key-group provided in a previous call to * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace. @@ -110,9 +121,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> { @Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) { try { serializeNamespace(namespace, namespaceSerializer); - final byte[] result = keyOutView.getCopyOfBuffer(); - resetToKey(); - return result; + return keyOutView.getCopyOfBuffer(); } catch (IOException shouldNeverHappen) { throw new FlinkRuntimeException(shouldNeverHappen); } @@ -120,7 +129,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> { /** * Returns a serialized composite key, from the key and key-group provided in a previous call to - * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, folloed by the given + * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, followed by the given * user-key. * * @param namespace the namespace to concatenate for the serialized composite key bytes. @@ -141,9 +150,35 @@ class RocksDBSerializedCompositeKeyBuilder<K> { throws IOException { serializeNamespace(namespace, namespaceSerializer); userKeySerializer.serialize(userKey, keyOutView); - byte[] result = keyOutView.getCopyOfBuffer(); - resetToKey(); - return result; + return keyOutView.getCopyOfBuffer(); + } + + /** + * Returns a serialized composite key, from the key and key-group provided in a previous call to + * {@link #setKeyAndKeyGroup(Object, int)} and the namespace provided in {@link + * #setNamespace(Object, TypeSerializer)}, followed by the given user-key. + * + * @param userKey the user-key to concatenate for the serialized composite key, after the + * namespace. + * @param userKeySerializer the serializer to obtain the serialized form of the user-key. + * @param <UK> the type of the user-key. + * @return the bytes for the serialized composite key of key-group, key, namespace. + */ + @Nonnull + public <UK> byte[] buildCompositeKeyUserKey( + @Nonnull UK userKey, @Nonnull TypeSerializer<UK> userKeySerializer) throws IOException { + // this should only be called when there is already a namespace written. + assert isNamespaceWritten(); + resetToNamespace(); + + userKeySerializer.serialize(userKey, keyOutView); + return keyOutView.getCopyOfBuffer(); + } + + /** Returns a serialized composite key, from whatever was set so far. */ + @Nonnull + public byte[] build() throws IOException { + return keyOutView.getCopyOfBuffer(); } private void serializeKeyGroupAndKey(K key, int keyGroupId) throws IOException { @@ -152,7 +187,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> { resetFully(); // write key-group - RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, keyGroupPrefixBytes, keyOutView); + CompositeKeySerializationUtils.writeKeyGroup(keyGroupId, keyGroupPrefixBytes, keyOutView); // write key keySerializer.serialize(key, keyOutView); afterKeyMark = keyOutView.length(); @@ -165,33 +200,45 @@ class RocksDBSerializedCompositeKeyBuilder<K> { // this should only be called when there is already a key written so that we build the // composite. assert isKeyWritten(); + resetToKey(); final boolean ambiguousCompositeKeyPossible = isAmbiguousCompositeKeyPossible(namespaceSerializer); if (ambiguousCompositeKeyPossible) { - RocksDBKeySerializationUtils.writeVariableIntBytes( + CompositeKeySerializationUtils.writeVariableIntBytes( afterKeyMark - keyGroupPrefixBytes, keyOutView); } - RocksDBKeySerializationUtils.writeNameSpace( + CompositeKeySerializationUtils.writeNameSpace( namespace, namespaceSerializer, keyOutView, ambiguousCompositeKeyPossible); + afterNamespaceMark = keyOutView.length(); } private void resetFully() { afterKeyMark = 0; + afterNamespaceMark = 0; keyOutView.clear(); } private void resetToKey() { + afterNamespaceMark = 0; keyOutView.setPosition(afterKeyMark); } + private void resetToNamespace() { + keyOutView.setPosition(afterNamespaceMark); + } + private boolean isKeyWritten() { return afterKeyMark > 0; } + private boolean isNamespaceWritten() { + return afterNamespaceMark > 0; + } + @VisibleForTesting boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) { return keySerializerTypeVariableSized - & RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer); + & CompositeKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java similarity index 80% rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java index 3bbb20b..65ba8ea 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.contrib.streaming.state; +package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; @@ -30,17 +30,17 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.junit.Assert; import org.junit.Test; -/** Tests for guarding {@link RocksDBKeySerializationUtils}. */ -public class RocksDBKeySerializationUtilsTest { +/** Tests for guarding {@link CompositeKeySerializationUtils}. */ +public class CompositeKeySerializationUtilsTest { @Test public void testIsAmbiguousKeyPossible() { Assert.assertFalse( - RocksDBKeySerializationUtils.isAmbiguousKeyPossible( + CompositeKeySerializationUtils.isAmbiguousKeyPossible( IntSerializer.INSTANCE, StringSerializer.INSTANCE)); Assert.assertTrue( - RocksDBKeySerializationUtils.isAmbiguousKeyPossible( + CompositeKeySerializationUtils.isAmbiguousKeyPossible( StringSerializer.INSTANCE, StringSerializer.INSTANCE)); } @@ -52,10 +52,10 @@ public class RocksDBKeySerializationUtilsTest { for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) { for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) { outputStream.reset(); - RocksDBKeySerializationUtils.writeKeyGroup( + CompositeKeySerializationUtils.writeKeyGroup( orgKeyGroup, keyGroupPrefixBytes, outputView); int deserializedKeyGroup = - RocksDBKeySerializationUtils.readKeyGroup( + CompositeKeySerializationUtils.readKeyGroup( keyGroupPrefixBytes, new DataInputViewStreamWrapper( new ByteArrayInputStreamWithPos( @@ -73,17 +73,19 @@ public class RocksDBKeySerializationUtilsTest { // test for key for (int orgKey = 0; orgKey < 100; ++orgKey) { outputView.clear(); - RocksDBKeySerializationUtils.writeKey( + CompositeKeySerializationUtils.writeKey( orgKey, IntSerializer.INSTANCE, outputView, false); inputView.setBuffer(outputView.getCopyOfBuffer()); int deserializedKey = - RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false); + CompositeKeySerializationUtils.readKey( + IntSerializer.INSTANCE, inputView, false); Assert.assertEquals(orgKey, deserializedKey); - RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true); + CompositeKeySerializationUtils.writeKey( + orgKey, IntSerializer.INSTANCE, outputView, true); inputView.setBuffer(outputView.getCopyOfBuffer()); deserializedKey = - RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true); + CompositeKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true); Assert.assertEquals(orgKey, deserializedKey); } } @@ -95,19 +97,19 @@ public class RocksDBKeySerializationUtilsTest { for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) { outputView.clear(); - RocksDBKeySerializationUtils.writeNameSpace( + CompositeKeySerializationUtils.writeNameSpace( orgNamespace, IntSerializer.INSTANCE, outputView, false); inputView.setBuffer(outputView.getCopyOfBuffer()); int deserializedNamepsace = - RocksDBKeySerializationUtils.readNamespace( + CompositeKeySerializationUtils.readNamespace( IntSerializer.INSTANCE, inputView, false); Assert.assertEquals(orgNamespace, deserializedNamepsace); - RocksDBKeySerializationUtils.writeNameSpace( + CompositeKeySerializationUtils.writeNameSpace( orgNamespace, IntSerializer.INSTANCE, outputView, true); inputView.setBuffer(outputView.getCopyOfBuffer()); deserializedNamepsace = - RocksDBKeySerializationUtils.readNamespace( + CompositeKeySerializationUtils.readNamespace( IntSerializer.INSTANCE, inputView, true); Assert.assertEquals(orgNamespace, deserializedNamepsace); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java similarity index 73% rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java index 9e4a208..8c7ac96 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.flink.contrib.streaming.state; +package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.junit.Assert; import org.junit.Before; @@ -33,8 +32,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -/** Test for @{@link RocksDBSerializedCompositeKeyBuilder}. */ -public class RocksDBSerializedCompositeKeyBuilderTest { +/** Test for @{@link SerializedCompositeKeyBuilder}. */ +public class SerializedCompositeKeyBuilderTest { private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128); @@ -57,36 +56,60 @@ public class RocksDBSerializedCompositeKeyBuilderTest { @Test public void testSetKeyNamespace() throws IOException { + testSetKeyNamespaceInternal(BuildKeyAndNamespaceType.BUILD); + } + + @Test + public void testSetKeyNamespaceWithSet() throws IOException { + testSetKeyNamespaceInternal(BuildKeyAndNamespaceType.SET_AND_BUILD); + } + + private void testSetKeyNamespaceInternal(BuildKeyAndNamespaceType buildKeyAndNamespaceType) + throws IOException { for (int parallelism : TEST_PARALLELISMS) { testSetKeyNamespaceInternal( IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceInternal( IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceInternal( StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceInternal( StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, - parallelism); + parallelism, + buildKeyAndNamespaceType); } } @Test public void testSetKeyNamespaceUserKey() throws IOException { + testSetKeyNamespaceUserKeyInternal(BuildKeyAndNamespaceType.BUILD); + } + + @Test + public void testSetKeyNamespaceUserKeyWithSet() throws IOException { + testSetKeyNamespaceUserKeyInternal(BuildKeyAndNamespaceType.SET_AND_BUILD); + } + + private void testSetKeyNamespaceUserKeyInternal( + BuildKeyAndNamespaceType buildKeyAndNamespaceType) throws IOException { for (int parallelism : TEST_PARALLELISMS) { testSetKeyNamespaceUserKeyInternal( IntSerializer.INSTANCE, @@ -95,7 +118,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_INTS, TEST_INTS, TEST_INTS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceUserKeyInternal( IntSerializer.INSTANCE, StringSerializer.INSTANCE, @@ -103,7 +127,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_INTS, TEST_STRINGS, TEST_INTS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceUserKeyInternal( StringSerializer.INSTANCE, IntSerializer.INSTANCE, @@ -111,7 +136,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_STRINGS, TEST_INTS, TEST_INTS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceUserKeyInternal( StringSerializer.INSTANCE, StringSerializer.INSTANCE, @@ -119,7 +145,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_STRINGS, TEST_STRINGS, TEST_INTS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceUserKeyInternal( IntSerializer.INSTANCE, IntSerializer.INSTANCE, @@ -127,7 +154,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_INTS, TEST_INTS, TEST_STRINGS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceUserKeyInternal( IntSerializer.INSTANCE, StringSerializer.INSTANCE, @@ -135,7 +163,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_INTS, TEST_STRINGS, TEST_STRINGS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceUserKeyInternal( StringSerializer.INSTANCE, IntSerializer.INSTANCE, @@ -143,7 +172,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_STRINGS, TEST_INTS, TEST_STRINGS, - parallelism); + parallelism, + buildKeyAndNamespaceType); testSetKeyNamespaceUserKeyInternal( StringSerializer.INSTANCE, StringSerializer.INSTANCE, @@ -151,7 +181,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TEST_STRINGS, TEST_STRINGS, TEST_STRINGS, - parallelism); + parallelism, + buildKeyAndNamespaceType); } } @@ -159,7 +190,7 @@ public class RocksDBSerializedCompositeKeyBuilderTest { TypeSerializer<K> serializer, Collection<K> testKeys, int maxParallelism) throws IOException { final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1; - RocksDBSerializedCompositeKeyBuilder<K> keyBuilder = + SerializedCompositeKeyBuilder<K> keyBuilder = createRocksDBSerializedCompositeKeyBuilder(serializer, prefixBytes); final DataInputDeserializer deserializer = new DataInputDeserializer(); @@ -172,16 +203,22 @@ public class RocksDBSerializedCompositeKeyBuilderTest { } } + enum BuildKeyAndNamespaceType { + BUILD, + SET_AND_BUILD + } + private <K, N> void testSetKeyNamespaceInternal( TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Collection<K> testKeys, Collection<N> testNamespaces, - int maxParallelism) + int maxParallelism, + BuildKeyAndNamespaceType buildKeyAndNamespaceType) throws IOException { final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1; - RocksDBSerializedCompositeKeyBuilder<K> keyBuilder = + SerializedCompositeKeyBuilder<K> keyBuilder = createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes); final DataInputDeserializer deserializer = new DataInputDeserializer(); @@ -192,8 +229,15 @@ public class RocksDBSerializedCompositeKeyBuilderTest { for (K testKey : testKeys) { int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism); for (N testNamespace : testNamespaces) { - byte[] compositeBytes = - keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer); + final byte[] compositeBytes; + if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.BUILD) { + compositeBytes = + keyBuilder.buildCompositeKeyNamespace( + testNamespace, namespaceSerializer); + } else { + keyBuilder.setNamespace(testNamespace, namespaceSerializer); + compositeBytes = keyBuilder.build(); + } deserializer.setBuffer(compositeBytes); assertKeyGroupKeyNamespaceBytes( testKey, @@ -216,11 +260,12 @@ public class RocksDBSerializedCompositeKeyBuilderTest { Collection<K> testKeys, Collection<N> testNamespaces, Collection<U> testUserKeys, - int maxParallelism) + int maxParallelism, + BuildKeyAndNamespaceType buildKeyAndNamespaceType) throws IOException { final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1; - RocksDBSerializedCompositeKeyBuilder<K> keyBuilder = + SerializedCompositeKeyBuilder<K> keyBuilder = createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes); final DataInputDeserializer deserializer = new DataInputDeserializer(); @@ -231,13 +276,22 @@ public class RocksDBSerializedCompositeKeyBuilderTest { for (K testKey : testKeys) { int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism); for (N testNamespace : testNamespaces) { + if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.SET_AND_BUILD) { + keyBuilder.setNamespace(testNamespace, namespaceSerializer); + } for (U testUserKey : testUserKeys) { - byte[] compositeBytes = - keyBuilder.buildCompositeKeyNamesSpaceUserKey( - testNamespace, - namespaceSerializer, - testUserKey, - userKeySerializer); + final byte[] compositeBytes; + if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.BUILD) { + compositeBytes = + keyBuilder.buildCompositeKeyNamesSpaceUserKey( + testNamespace, + namespaceSerializer, + testUserKey, + userKeySerializer); + } else { + compositeBytes = + keyBuilder.buildCompositeKeyUserKey(testUserKey, userKeySerializer); + } deserializer.setBuffer(compositeBytes); assertKeyGroupKeyNamespaceUserKeyBytes( @@ -258,18 +312,16 @@ public class RocksDBSerializedCompositeKeyBuilderTest { } } - private <K> RocksDBSerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder( + private <K> SerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder( TypeSerializer<K> serializer, int prefixBytes) { final boolean variableSize = - RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer); - return new RocksDBSerializedCompositeKeyBuilder<>( + CompositeKeySerializationUtils.isSerializerTypeVariableSized(serializer); + return new SerializedCompositeKeyBuilder<>( serializer, dataOutputSerializer, prefixBytes, variableSize, 0); } private <K> int setKeyAndReturnKeyGroup( - RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder, - K key, - int maxParallelism) { + SerializedCompositeKeyBuilder<K> compositeKeyBuilder, K key, int maxParallelism) { int keyGroup = KeyGroupRangeAssignment.assignKeyToParallelOperator( @@ -288,10 +340,10 @@ public class RocksDBSerializedCompositeKeyBuilderTest { throws IOException { Assert.assertEquals( - keyGroup, RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer)); + keyGroup, CompositeKeySerializationUtils.readKeyGroup(prefixBytes, deserializer)); Assert.assertEquals( key, - RocksDBKeySerializationUtils.readKey( + CompositeKeySerializationUtils.readKey( typeSerializer, deserializer, ambiguousCompositeKeyPossible)); } @@ -313,7 +365,7 @@ public class RocksDBSerializedCompositeKeyBuilderTest { deserializer, ambiguousCompositeKeyPossible); N readNamespace = - RocksDBKeySerializationUtils.readNamespace( + CompositeKeySerializationUtils.readNamespace( namespaceSerializer, deserializer, ambiguousCompositeKeyPossible); Assert.assertEquals(namespace, readNamespace); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index c9a07cf..4ea9022 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -70,7 +71,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K protected final DataInputDeserializer dataInputView; - private final RocksDBSerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer; + private final SerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer; /** * Creates a new RocksDB backed state. @@ -138,8 +139,8 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K KeyGroupRangeAssignment.assignToKeyGroup( keyAndNamespace.f0, backend.getNumberOfKeyGroups()); - RocksDBSerializedCompositeKeyBuilder<K> keyBuilder = - new RocksDBSerializedCompositeKeyBuilder<>( + SerializedCompositeKeyBuilder<K> keyBuilder = + new SerializedCompositeKeyBuilder<>( safeKeySerializer, backend.getKeyGroupPrefixBytes(), 32); keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup); byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java index d96f8e7..4ea2e16 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java @@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.util.CloseableIterator; @@ -357,7 +358,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> outputView.clear(); try { - RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView); + CompositeKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView); } catch (IOException e) { throw new FlinkRuntimeException("Could not write key-group bytes.", e); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 5a76c08..958594f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -91,18 +92,18 @@ public class RocksDBIncrementalCheckpointUtils { final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) { - RocksDBKeySerializationUtils.serializeKeyGroup( + CompositeKeySerializationUtils.serializeKeyGroup( currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes); - RocksDBKeySerializationUtils.serializeKeyGroup( + CompositeKeySerializationUtils.serializeKeyGroup( targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes); deleteRange( db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize); } if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) { - RocksDBKeySerializationUtils.serializeKeyGroup( + CompositeKeySerializationUtils.serializeKeyGroup( targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes); - RocksDBKeySerializationUtils.serializeKeyGroup( + CompositeKeySerializationUtils.serializeKeyGroup( currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes); deleteRange( db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 72d1a8c..d6c7cff 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -50,6 +51,7 @@ import org.apache.flink.runtime.state.PriorityComparable; import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategyRunner; import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; @@ -213,7 +215,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across * all states. */ - private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder; + private final SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder; /** * Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState} @@ -247,7 +249,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { RocksDBWriteBatchWrapper writeBatchWrapper, ColumnFamilyHandle defaultColumnFamilyHandle, RocksDBNativeMetricMonitor nativeMetricMonitor, - RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder, + SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder, PriorityQueueSetFactory priorityQueueFactory, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext<K> keyContext, @@ -306,11 +308,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer(); final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8); boolean ambiguousKeyPossible = - RocksDBKeySerializationUtils.isAmbiguousKeyPossible( + CompositeKeySerializationUtils.isAmbiguousKeyPossible( getKeySerializer(), namespaceSerializer); final byte[] nameSpaceBytes; try { - RocksDBKeySerializationUtils.writeNameSpace( + CompositeKeySerializationUtils.writeNameSpace( namespace, namespaceSerializer, namespaceOutputView, ambiguousKeyPossible); nameSpaceBytes = namespaceOutputView.getCopyOfBuffer(); } catch (IOException ex) { @@ -352,7 +354,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer(); boolean ambiguousKeyPossible = - RocksDBKeySerializationUtils.isAmbiguousKeyPossible( + CompositeKeySerializationUtils.isAmbiguousKeyPossible( getKeySerializer(), namespaceSerializer); RocksIteratorWrapper iterator = @@ -493,7 +495,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return readOptions; } - RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() { + SerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() { return sharedRocksKeyBuilder; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index a3349a1..5f6426c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -35,11 +35,13 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; @@ -255,10 +257,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken ResourceGuard rocksDBResourceGuard = new ResourceGuard(); SnapshotStrategy<K> snapshotStrategy; PriorityQueueSetFactory priorityQueueFactory; - RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder; + SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder; // Number of bytes required to prefix the key groups. int keyGroupPrefixBytes = - RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix( + CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix( numberOfKeyGroups); try { // Variables for snapshot strategy when incremental checkpoint is enabled @@ -301,7 +303,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken // serializer // only now we can be certain that the key serializer used in the builder is final. sharedRocksKeyBuilder = - new RocksDBSerializedCompositeKeyBuilder<>( + new SerializedCompositeKeyBuilder<>( keySerializerProvider.currentSchemaSerializer(), keyGroupPrefixBytes, 32); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 490b294..da68662 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -29,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.FlinkRuntimeException; @@ -328,8 +329,8 @@ class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, U KeyGroupRangeAssignment.assignToKeyGroup( keyAndNamespace.f0, backend.getNumberOfKeyGroups()); - RocksDBSerializedCompositeKeyBuilder<K> keyBuilder = - new RocksDBSerializedCompositeKeyBuilder<>( + SerializedCompositeKeyBuilder<K> keyBuilder = + new SerializedCompositeKeyBuilder<>( safeKeySerializer, backend.getKeyGroupPrefixBytes(), 32); keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java index 582afe3..53f30ae 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java @@ -20,9 +20,9 @@ package org.apache.flink.contrib.streaming.state.iterator; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import javax.annotation.Nonnull; @@ -64,7 +64,7 @@ public abstract class AbstractRocksStateKeysIterator<K> implements AutoCloseable protected K deserializeKey(byte[] keyBytes, DataInputDeserializer readView) throws IOException { readView.setBuffer(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes); - return RocksDBKeySerializationUtils.readKey( + return CompositeKeySerializationUtils.readKey( keySerializer, byteArrayDataInputView, ambiguousKeyPossible); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java index 96c10f1..386c33b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java @@ -20,8 +20,8 @@ package org.apache.flink.contrib.streaming.state.iterator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nonnull; @@ -64,7 +64,7 @@ public class RocksStateKeysAndNamespaceIterator<K, N> extends AbstractRocksState final byte[] keyBytes = iterator.key(); final K currentKey = deserializeKey(keyBytes, byteArrayDataInputView); final N currentNamespace = - RocksDBKeySerializationUtils.readNamespace( + CompositeKeySerializationUtils.readNamespace( namespaceSerializer, byteArrayDataInputView, ambiguousKeyPossible); nextKey = Tuple2.of(currentKey, currentNamespace); iterator.next(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index a17552d..6b66853 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -19,7 +19,6 @@ package org.apache.flink.contrib.streaming.state.restore; import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils; -import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo; import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions; import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils; @@ -32,6 +31,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.DirectoryStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; @@ -304,11 +304,11 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor // Transfer remaining key-groups from temporary instance into base DB byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - RocksDBKeySerializationUtils.serializeKeyGroup( + CompositeKeySerializationUtils.serializeKeyGroup( keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - RocksDBKeySerializationUtils.serializeKeyGroup( + CompositeKeySerializationUtils.serializeKeyGroup( keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); for (KeyedStateHandle rawStateHandle : restoreStateHandles) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java index c29955e..45da654 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java @@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.InternalPriorityQueueTestBase; import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; @@ -58,7 +59,8 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest DataOutputSerializer outputStreamWithPos = new DataOutputSerializer(128); DataInputDeserializer inputStreamWithPos = new DataInputDeserializer(); int keyGroupPrefixBytes = - RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(numKeyGroups); + CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix( + numKeyGroups); TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(32); return new RocksDBCachingPriorityQueueSet<>( keyGroupId, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java index fc1627c..281aa4b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.util.TestLogger; @@ -139,8 +140,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger { for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) { for (int j = 0; j < 100; ++j) { outputView.clear(); - RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView); - RocksDBKeySerializationUtils.writeKey( + CompositeKeySerializationUtils.writeKeyGroup( + i, keyGroupPrefixBytes, outputView); + CompositeKeySerializationUtils.writeKey( j, IntSerializer.INSTANCE, outputView, false); rocksDB.put( columnFamilyHandle, @@ -152,8 +154,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger { for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) { for (int j = 0; j < 100; ++j) { outputView.clear(); - RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView); - RocksDBKeySerializationUtils.writeKey( + CompositeKeySerializationUtils.writeKeyGroup( + i, keyGroupPrefixBytes, outputView); + CompositeKeySerializationUtils.writeKey( j, IntSerializer.INSTANCE, outputView, false); byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer()); Assert.assertEquals(String.valueOf(j), new String(value)); @@ -171,8 +174,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger { for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) { for (int j = 0; j < 100; ++j) { outputView.clear(); - RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView); - RocksDBKeySerializationUtils.writeKey( + CompositeKeySerializationUtils.writeKeyGroup( + i, keyGroupPrefixBytes, outputView); + CompositeKeySerializationUtils.writeKey( j, IntSerializer.INSTANCE, outputView, false); byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer()); if (targetGroupRange.contains(i)) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java index 56949ac..b62e689 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysAndNamespaceIterator; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.junit.Assert; import org.junit.Rule; @@ -85,9 +86,9 @@ public class RocksDBRocksStateKeysAndNamespacesIteratorTest { DataOutputSerializer outputStream = new DataOutputSerializer(8); boolean ambiguousKeyPossible = - RocksDBKeySerializationUtils.isAmbiguousKeyPossible( + CompositeKeySerializationUtils.isAmbiguousKeyPossible( keySerializer, StringSerializer.INSTANCE); - RocksDBKeySerializationUtils.writeNameSpace( + CompositeKeySerializationUtils.writeNameSpace( namespace, StringSerializer.INSTANCE, outputStream, ambiguousKeyPossible); // already created with the state, should be closed with the backend diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java index fdd53a3..6b9e7a4 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.junit.Assert; import org.junit.Rule; @@ -82,9 +83,9 @@ public class RocksDBRocksStateKeysIteratorTest { DataOutputSerializer outputStream = new DataOutputSerializer(8); boolean ambiguousKeyPossible = - RocksDBKeySerializationUtils.isAmbiguousKeyPossible( + CompositeKeySerializationUtils.isAmbiguousKeyPossible( keySerializer, StringSerializer.INSTANCE); - RocksDBKeySerializationUtils.writeNameSpace( + CompositeKeySerializationUtils.writeNameSpace( namespace, StringSerializer.INSTANCE, outputStream, ambiguousKeyPossible); byte[] nameSpaceBytes = outputStream.getCopyOfBuffer();