[ 
https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565786#comment-16565786
 ] 

ASF GitHub Bot commented on FLINK-9981:
---------------------------------------

azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune 
performance of RocksDB implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206966986
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
 ##########
 @@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+import static 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.OrderedByteArraySetCache.LEXICOGRAPHIC_BYTE_COMPARATOR;
+
+/**
+ * A priority queue with set semantics, implemented on top of RocksDB. This 
uses a {@link TreeSet} to cache the bytes
+ * of up to the first n elements from RocksDB in memory to reduce interaction 
with RocksDB, in particular seek
+ * operations. Cache uses a simple write-through policy.
+ *
+ * @param <E> the type of the contained elements in the queue.
+ */
+public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
+       implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
+
+       /** Serialized empty value to insert into RocksDB. */
+       private static final byte[] DUMMY_BYTES = new byte[] {};
+
+       /** The RocksDB instance that serves as store. */
+       @Nonnull
+       private final RocksDB db;
+
+       /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+       @Nonnull
+       private final ColumnFamilyHandle columnFamilyHandle;
+
+       /**
+        * Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+        * aligned with their logical order.
+        */
+       @Nonnull
+       private final TypeSerializer<E> byteOrderProducingSerializer;
+
+       /** Wrapper to batch all writes to RocksDB. */
+       @Nonnull
+       private final RocksDBWriteBatchWrapper batchWrapper;
+
+       /** The key-group id in serialized form. */
+       @Nonnull
+       private final byte[] groupPrefixBytes;
+
+       /** Output stream that helps to serialize elements. */
+       @Nonnull
+       private final ByteArrayOutputStreamWithPos outputStream;
+
+       /** Output view that helps to serialize elements, must wrap the output 
stream. */
+       @Nonnull
+       private final DataOutputViewStreamWrapper outputView;
+
+       @Nonnull
+       private final ByteArrayInputStreamWithPos inputStream;
+
+       @Nonnull
+       private final DataInputViewStreamWrapper inputView;
+
+       /** In memory cache that holds a partial view on the head of the 
RocksDB content. */
+       @Nonnull
+       private final OrderedByteArraySetCache orderedCache;
+
+       /** This holds the key that we use to seek to the first element in 
RocksDB, to improve seek/iterator performance. */
+       @Nonnull
+       private byte[] seekHint;
+
+       /** Cache for the head element in de-serialized form. */
+       @Nullable
+       private E peekCache;
+
+       /** This flag is true if there could be elements in the backend that 
are not in the cache (false positives ok). */
+       private boolean storeOnlyElements;
+
+       /** Index for management as a {@link HeapPriorityQueueElement}. */
+       private int internalIndex;
+
+       RocksDBCachingPriorityQueueSet(
+               @Nonnegative int keyGroupId,
+               @Nonnegative int keyGroupPrefixBytes,
+               @Nonnull RocksDB db,
+               @Nonnull ColumnFamilyHandle columnFamilyHandle,
+               @Nonnull TypeSerializer<E> byteOrderProducingSerializer,
+               @Nonnull ByteArrayOutputStreamWithPos outputStream,
+               @Nonnull ByteArrayInputStreamWithPos inputStream,
+               @Nonnull RocksDBWriteBatchWrapper batchWrapper,
+               @Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
+               this.db = db;
+               this.columnFamilyHandle = columnFamilyHandle;
+               this.byteOrderProducingSerializer = 
byteOrderProducingSerializer;
+               this.outputStream = outputStream;
+               this.inputStream = inputStream;
+               this.batchWrapper = batchWrapper;
+               this.storeOnlyElements = true;
+               this.outputView = new DataOutputViewStreamWrapper(outputStream);
+               this.inputView = new DataInputViewStreamWrapper(inputStream);
+               this.orderedCache = orderedByteArraySetCache;
+               this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, 
keyGroupPrefixBytes);
+               this.seekHint = groupPrefixBytes;
+               this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
+       }
+
+       @Nullable
+       @Override
+       public E peek() {
+
+               checkRefillCacheFromStore();
+
+               if (peekCache != null) {
+                       return peekCache;
+               }
+
+               byte[] firstBytes = orderedCache.peekFirst();
+               if (firstBytes != null) {
+                       peekCache = deserializeElement(firstBytes);
+                       return peekCache;
+               } else {
+                       return null;
+               }
+       }
+
+       @Nullable
+       @Override
+       public E poll() {
+
+               checkRefillCacheFromStore();
+
+               final byte[] firstBytes = orderedCache.pollFirst();
+
+               if (firstBytes == null) {
+                       return null;
+               }
+
+               // write-through sync
+               removeFromRocksDB(firstBytes);
+
+               if (orderedCache.isEmpty()) {
+                       seekHint = firstBytes;
+               }
+
+               if (peekCache != null) {
+                       E fromCache = peekCache;
+                       peekCache = null;
+                       return fromCache;
+               } else {
+                       return deserializeElement(firstBytes);
+               }
+       }
+
+       @Override
+       public boolean add(@Nonnull E toAdd) {
+
+               checkRefillCacheFromStore();
+
+               final byte[] toAddBytes = serializeElement(toAdd);
+
+               final boolean cacheFull = orderedCache.isFull();
+
+               if ((!cacheFull && !storeOnlyElements) ||
+                       LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, 
orderedCache.peekLast()) < 0) {
+
+                       if (cacheFull) {
+                               // we drop the element with lowest priority 
from the cache
+                               orderedCache.pollLast();
+                               // the dropped element is now only in the store
+                               storeOnlyElements = true;
+                       }
+
+                       if (orderedCache.add(toAddBytes)) {
+                               // write-through sync
+                               addToRocksDB(toAddBytes);
+                               if (toAddBytes == orderedCache.peekFirst()) {
+                                       peekCache = null;
+                                       return true;
+                               }
+                       }
+               } else {
+                       // we only added to the store
+                       addToRocksDB(toAddBytes);
+                       storeOnlyElements = true;
+               }
+               return false;
+       }
+
+       @Override
+       public boolean remove(@Nonnull E toRemove) {
+
+               checkRefillCacheFromStore();
+
+               final byte[] oldHead = orderedCache.peekFirst();
+
+               if (oldHead == null) {
+                       return false;
+               }
+
+               final byte[] toRemoveBytes = serializeElement(toRemove);
+
+               // write-through sync
+               removeFromRocksDB(toRemoveBytes);
+               orderedCache.remove(toRemoveBytes);
+
+               if (orderedCache.isEmpty()) {
+                       seekHint = toRemoveBytes;
+                       peekCache = null;
+                       return true;
+               }
+
+               if (oldHead != orderedCache.peekFirst()) {
+                       peekCache = null;
+                       return true;
+               }
+
+               return false;
+       }
+
+       @Override
+       public void addAll(@Nullable Collection<? extends E> toAdd) {
+
+               if (toAdd == null) {
+                       return;
+               }
+
+               for (E element : toAdd) {
+                       add(element);
+               }
+       }
+
+       @Override
+       public boolean isEmpty() {
+               checkRefillCacheFromStore();
+               return orderedCache.isEmpty();
+       }
+
+       @Nonnull
+       @Override
+       public CloseableIterator<E> iterator() {
+               return new DeserializingIteratorWrapper(orderedBytesIterator());
+       }
+
+       /**
+        * This implementation comes at a relatively high cost per invocation. 
It should not be called repeatedly when it is
+        * clear that the value did not change. Currently this is only truly 
used to realize certain higher-level tests.
+        */
+       @Override
+       public int size() {
+
+               if (storeOnlyElements) {
+                       int count = 0;
+                       try (final RocksBytesIterator iterator = 
orderedBytesIterator()) {
+                               while (iterator.hasNext()) {
+                                       iterator.next();
+                                       ++count;
+                               }
+                       }
+                       return count;
+               } else {
+                       return orderedCache.size();
+               }
+       }
+
+       @Override
+       public int getInternalIndex() {
+               return internalIndex;
+       }
+
+       @Override
+       public void setInternalIndex(int newIndex) {
+               this.internalIndex = newIndex;
+       }
+
+       @Nonnull
+       private RocksBytesIterator orderedBytesIterator() {
+               flushWriteBatch();
+               return new RocksBytesIterator(
+                       new RocksIteratorWrapper(
+                               db.newIterator(columnFamilyHandle)));
+       }
+
+       /**
+        * Ensures that recent writes are flushed and reflect in the RocksDB 
instance.
+        */
+       private void flushWriteBatch() {
+               try {
+                       batchWrapper.flush();
+               } catch (RocksDBException e) {
+                       throw new FlinkRuntimeException(e);
+               }
+       }
+
+       private void addToRocksDB(@Nonnull byte[] toAddBytes) {
+               try {
+                       batchWrapper.put(columnFamilyHandle, toAddBytes, 
DUMMY_BYTES);
+               } catch (RocksDBException e) {
+                       throw new FlinkRuntimeException(e);
+               }
+       }
+
+       private void removeFromRocksDB(@Nonnull byte[] toRemoveBytes) {
+               try {
+                       batchWrapper.remove(columnFamilyHandle, toRemoveBytes);
+               } catch (RocksDBException e) {
+                       throw new FlinkRuntimeException(e);
+               }
+       }
+
+       private void checkRefillCacheFromStore() {
+               if (storeOnlyElements && orderedCache.isEmpty()) {
+                       try (final RocksBytesIterator iterator = 
orderedBytesIterator()) {
+                               
orderedCache.bulkLoadFromOrderedIterator(iterator);
+                               storeOnlyElements = iterator.hasNext();
+                       } catch (Exception e) {
+                               throw new FlinkRuntimeException("Exception 
while refilling store from iterator.", e);
+                       }
+               }
+       }
+
+       private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+               for (int i = 0; i < prefixBytes.length; ++i) {
+                       if (bytes[i] != prefixBytes[i]) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       @Nonnull
+       private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {
+
+               outputStream.reset();
+
+               try {
+                       RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, 
numPrefixBytes, outputView);
+               } catch (IOException e) {
+                       throw new FlinkRuntimeException("Could not write 
key-group bytes.", e);
+               }
+
+               return outputStream.toByteArray();
+       }
+
+       @Nonnull
+       private byte[] serializeElement(@Nonnull E element) {
+               try {
+                       outputStream.reset();
+                       outputView.write(groupPrefixBytes);
+                       byteOrderProducingSerializer.serialize(element, 
outputView);
+                       return outputStream.toByteArray();
+               } catch (IOException e) {
+                       throw new FlinkRuntimeException("Error while 
serializing the element.", e);
+               }
+       }
+
+       @Nonnull
+       private E deserializeElement(@Nonnull byte[] bytes) {
+               try {
+                       inputStream.setBuffer(bytes, 0, bytes.length);
+                       inputView.skipBytes(groupPrefixBytes.length);
 
 Review comment:
   Can the first 2 lines be reduced to:
   inputStream.setBuffer(bytes, **groupPrefixBytes.length**, bytes.length);?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tune performance of RocksDB implementation
> ------------------------------------------
>
>                 Key: FLINK-9981
>                 URL: https://issues.apache.org/jira/browse/FLINK-9981
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> General performance tuning/polishing for the RocksDB implementation. We can 
> figure out how caching/seeking can be improved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to