[ 
https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565785#comment-16565785
 ] 

ASF GitHub Bot commented on FLINK-9981:
---------------------------------------

azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune 
performance of RocksDB implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206962666
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
 ##########
 @@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+import static 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.OrderedByteArraySetCache.LEXICOGRAPHIC_BYTE_COMPARATOR;
+
+/**
+ * A priority queue with set semantics, implemented on top of RocksDB. This 
uses a {@link TreeSet} to cache the bytes
+ * of up to the first n elements from RocksDB in memory to reduce interaction 
with RocksDB, in particular seek
+ * operations. Cache uses a simple write-through policy.
+ *
+ * @param <E> the type of the contained elements in the queue.
+ */
+public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
+       implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
+
+       /** Serialized empty value to insert into RocksDB. */
+       private static final byte[] DUMMY_BYTES = new byte[] {};
+
+       /** The RocksDB instance that serves as store. */
+       @Nonnull
+       private final RocksDB db;
+
+       /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+       @Nonnull
+       private final ColumnFamilyHandle columnFamilyHandle;
+
+       /**
+        * Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+        * aligned with their logical order.
+        */
+       @Nonnull
+       private final TypeSerializer<E> byteOrderProducingSerializer;
+
+       /** Wrapper to batch all writes to RocksDB. */
+       @Nonnull
+       private final RocksDBWriteBatchWrapper batchWrapper;
+
+       /** The key-group id in serialized form. */
+       @Nonnull
+       private final byte[] groupPrefixBytes;
+
+       /** Output stream that helps to serialize elements. */
+       @Nonnull
+       private final ByteArrayOutputStreamWithPos outputStream;
+
+       /** Output view that helps to serialize elements, must wrap the output 
stream. */
+       @Nonnull
+       private final DataOutputViewStreamWrapper outputView;
+
+       @Nonnull
+       private final ByteArrayInputStreamWithPos inputStream;
+
+       @Nonnull
+       private final DataInputViewStreamWrapper inputView;
+
+       /** In memory cache that holds a partial view on the head of the 
RocksDB content. */
+       @Nonnull
+       private final OrderedByteArraySetCache orderedCache;
+
+       /** This holds the key that we use to seek to the first element in 
RocksDB, to improve seek/iterator performance. */
+       @Nonnull
+       private byte[] seekHint;
+
+       /** Cache for the head element in de-serialized form. */
+       @Nullable
+       private E peekCache;
+
+       /** This flag is true if there could be elements in the backend that 
are not in the cache (false positives ok). */
+       private boolean storeOnlyElements;
 
 Review comment:
   name of `storeOnlyElements` field was a bit unclear w/o comment, maybe 
`someElementsNotCached` is more readable

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tune performance of RocksDB implementation
> ------------------------------------------
>
>                 Key: FLINK-9981
>                 URL: https://issues.apache.org/jira/browse/FLINK-9981
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> General performance tuning/polishing for the RocksDB implementation. We can 
> figure out how caching/seeking can be improved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to