This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9d45ad046742af9d1e8172720ffcd8347bb37ce8 Author: Stephan Ewen <se...@apache.org> AuthorDate: Thu Nov 7 14:58:22 2019 +0100 [FLINK-12697][state backends] (follow-up) Minor optimization that avoids extra memory segment wrapping. - the SkipListKeySerializer offers methods to directly return a MemorySegment instead of a byte[] to exploit implementations that thus avoid extra wrapping. --- .../state/heap/CopyOnWriteSkipListStateMap.java | 13 ++++------ .../runtime/state/heap/SkipListKeySerializer.java | 30 +++++++++++++++++----- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java index 1069392..0ecd5e8 100644 --- a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java +++ b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java @@ -996,8 +996,7 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> extends StateMap<K, N, S * @return the {@link MemorySegment} wrapping up the serialized key bytes. */ private MemorySegment getKeySegment(K key, N namespace) { - byte[] keyBytes = skipListKeySerializer.serialize(key, namespace); - return MemorySegmentFactory.wrap(keyBytes); + return skipListKeySerializer.serializeToSegment(key, namespace); } // Help methods --------------------------------------------------------------- @@ -1165,9 +1164,8 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> extends StateMap<K, N, S @Override public Stream<K> getKeys(N namespace) { updateStat(); - byte[] namespaceBytes = skipListKeySerializer.serializeNamespace(namespace); - MemorySegment namespaceSegment = MemorySegmentFactory.wrap(namespaceBytes); - Iterator<Long> nodeIter = new NamespaceNodeIterator(namespaceSegment, 0, namespaceBytes.length); + MemorySegment namespaceSegment = skipListKeySerializer.serializeNamespaceToSegment(namespace); + Iterator<Long> nodeIter = new NamespaceNodeIterator(namespaceSegment, 0, namespaceSegment.size()); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(nodeIter, 0), false) .map(this::helpGetKey); } @@ -1176,9 +1174,8 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> extends StateMap<K, N, S @Override public int sizeOfNamespace(Object namespace) { updateStat(); - byte[] namespaceBytes = skipListKeySerializer.serializeNamespace((N) namespace); - MemorySegment namespaceSegment = MemorySegmentFactory.wrap(namespaceBytes); - Iterator<Long> nodeIter = new NamespaceNodeIterator(namespaceSegment, 0, namespaceBytes.length); + MemorySegment namespaceSegment = skipListKeySerializer.serializeNamespaceToSegment((N) namespace); + Iterator<Long> nodeIter = new NamespaceNodeIterator(namespaceSegment, 0, namespaceSegment.size()); int size = 0; while (nodeIter.hasNext()) { nodeIter.next(); diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java index 56a09b6..46c5460 100644 --- a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java +++ b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java @@ -60,6 +60,19 @@ class SkipListKeySerializer<K, N> { * - byte[]: serialized key */ byte[] serialize(K key, N namespace) { + // we know that the segment contains a byte[], because it is created + // in the method below by wrapping a byte[] + return serializeToSegment(key, namespace).getArray(); + } + + /** + * Serialize the key and namespace to bytes. The format is + * - int: length of serialized namespace + * - byte[]: serialized namespace + * - int: length of serialized key + * - byte[]: serialized key + */ + MemorySegment serializeToSegment(K key, N namespace) { outputStream.reset(); try { // serialize namespace @@ -78,15 +91,14 @@ class SkipListKeySerializer<K, N> { throw new RuntimeException("Failed to serialize key", e); } - byte[] result = outputStream.toByteArray(); + final byte[] result = outputStream.toByteArray(); + final MemorySegment segment = MemorySegmentFactory.wrap(result); + // set length of namespace and key - int namespaceLen = keyStartPos - Integer.BYTES; - int keyLen = result.length - keyStartPos - Integer.BYTES; - MemorySegment segment = MemorySegmentFactory.wrap(result); - segment.putInt(0, namespaceLen); - segment.putInt(keyStartPos, keyLen); + segment.putInt(0, keyStartPos - Integer.BYTES); + segment.putInt(keyStartPos, result.length - keyStartPos - Integer.BYTES); - return result; + return segment; } /** @@ -162,4 +174,8 @@ class SkipListKeySerializer<K, N> { } return outputStream.toByteArray(); } + + MemorySegment serializeNamespaceToSegment(N namespace) { + return MemorySegmentFactory.wrap(serializeNamespace(namespace)); + } }