This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d45ad046742af9d1e8172720ffcd8347bb37ce8
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Nov 7 14:58:22 2019 +0100

    [FLINK-12697][state backends] (follow-up) Minor optimization that avoids 
extra memory segment wrapping.
    
      - the SkipListKeySerializer offers methods to directly return a 
MemorySegment instead of a byte[]
        to exploit implementations that thus avoid extra wrapping.
---
 .../state/heap/CopyOnWriteSkipListStateMap.java    | 13 ++++------
 .../runtime/state/heap/SkipListKeySerializer.java  | 30 +++++++++++++++++-----
 2 files changed, 28 insertions(+), 15 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 
b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
index 1069392..0ecd5e8 100644
--- 
a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
+++ 
b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
@@ -996,8 +996,7 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> 
extends StateMap<K, N, S
         * @return the {@link MemorySegment} wrapping up the serialized key 
bytes.
         */
        private MemorySegment getKeySegment(K key, N namespace) {
-               byte[] keyBytes = skipListKeySerializer.serialize(key, 
namespace);
-               return MemorySegmentFactory.wrap(keyBytes);
+               return skipListKeySerializer.serializeToSegment(key, namespace);
        }
 
        // Help methods 
---------------------------------------------------------------
@@ -1165,9 +1164,8 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> 
extends StateMap<K, N, S
        @Override
        public Stream<K> getKeys(N namespace) {
                updateStat();
-               byte[] namespaceBytes = 
skipListKeySerializer.serializeNamespace(namespace);
-               MemorySegment namespaceSegment = 
MemorySegmentFactory.wrap(namespaceBytes);
-               Iterator<Long> nodeIter = new 
NamespaceNodeIterator(namespaceSegment, 0, namespaceBytes.length);
+               MemorySegment namespaceSegment = 
skipListKeySerializer.serializeNamespaceToSegment(namespace);
+               Iterator<Long> nodeIter = new 
NamespaceNodeIterator(namespaceSegment, 0, namespaceSegment.size());
                return 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(nodeIter, 0), false)
                        .map(this::helpGetKey);
        }
@@ -1176,9 +1174,8 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> 
extends StateMap<K, N, S
        @Override
        public int sizeOfNamespace(Object namespace) {
                updateStat();
-               byte[] namespaceBytes = 
skipListKeySerializer.serializeNamespace((N) namespace);
-               MemorySegment namespaceSegment = 
MemorySegmentFactory.wrap(namespaceBytes);
-               Iterator<Long> nodeIter = new 
NamespaceNodeIterator(namespaceSegment, 0, namespaceBytes.length);
+               MemorySegment namespaceSegment = 
skipListKeySerializer.serializeNamespaceToSegment((N) namespace);
+               Iterator<Long> nodeIter = new 
NamespaceNodeIterator(namespaceSegment, 0, namespaceSegment.size());
                int size = 0;
                while (nodeIter.hasNext()) {
                        nodeIter.next();
diff --git 
a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
 
b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
index 56a09b6..46c5460 100644
--- 
a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
+++ 
b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
@@ -60,6 +60,19 @@ class SkipListKeySerializer<K, N> {
         *      - byte[]: serialized key
         */
        byte[] serialize(K key, N namespace) {
+               // we know that the segment contains a byte[], because it is 
created
+               // in the method below by wrapping a byte[]
+               return serializeToSegment(key, namespace).getArray();
+       }
+
+       /**
+        * Serialize the key and namespace to bytes. The format is
+        *      - int:    length of serialized namespace
+        *      - byte[]: serialized namespace
+        *      - int:    length of serialized key
+        *      - byte[]: serialized key
+        */
+       MemorySegment serializeToSegment(K key, N namespace) {
                outputStream.reset();
                try {
                        // serialize namespace
@@ -78,15 +91,14 @@ class SkipListKeySerializer<K, N> {
                        throw new RuntimeException("Failed to serialize key", 
e);
                }
 
-               byte[] result = outputStream.toByteArray();
+               final byte[] result = outputStream.toByteArray();
+               final MemorySegment segment = MemorySegmentFactory.wrap(result);
+
                // set length of namespace and key
-               int namespaceLen = keyStartPos - Integer.BYTES;
-               int keyLen = result.length - keyStartPos - Integer.BYTES;
-               MemorySegment segment = MemorySegmentFactory.wrap(result);
-               segment.putInt(0, namespaceLen);
-               segment.putInt(keyStartPos, keyLen);
+               segment.putInt(0, keyStartPos - Integer.BYTES);
+               segment.putInt(keyStartPos, result.length - keyStartPos - 
Integer.BYTES);
 
-               return result;
+               return segment;
        }
 
        /**
@@ -162,4 +174,8 @@ class SkipListKeySerializer<K, N> {
                }
                return outputStream.toByteArray();
        }
+
+       MemorySegment serializeNamespaceToSegment(N namespace) {
+               return MemorySegmentFactory.wrap(serializeNamespace(namespace));
+       }
 }

Reply via email to