vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1089556608


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, 
reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided 
segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is 
one that
+     * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+     * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key 
is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no 
validTo
+     * timestamp associated with it.

Review Comment:
   Hey @mjsax I just pushed an update to the serialization of "empty" segments 
(per your suggestion). The new serialization is `minTimestamp = nextTimestamp` 
with a single record version stored in the record versions list (tombstone 
value and timestamp equal to both `minTimestamp` and `nextTimestamp`). So, even 
though it's still a special case which cannot be combined with the regular case 
(as evidenced by the fact that the `insertAsLatest()` and `insertAsEarliest()` 
methods still have if-conditions to handle this case separately), now this 
special-casing is fully encapsulated within this value formatter utility class, 
i.e., the versioned store implementation itself (for which I'll open a PR as 
soon as this one is merged) does not need to concern itself with this special 
case (and therefore the public-facing `isEmpty()` methods have been removed 
accordingly). The updated javadocs in the latest commit contain more details.
   
   I've also renamed "empty" segment to "degenerate" segment, to clarify that 
the segment is not truly empty, it's just degenerate because the single 
tombstone in it has `validFrom = validTo`, which is not typically allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to