vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1084484753


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, 
reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided 
segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is 
one that
+     * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+     * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key 
is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no 
validTo
+     * timestamp associated with it.

Review Comment:
   That's fair. I added an extra paragraph into the top-level javadoc just now. 
LMK what you think.
   
   > In general, I prefer to have some "invariant" as it makes it simpler to 
reason about the code, but introducing this edge case void the invariant that 
`nextTimestamp` is the "largest validTo" of the segment.
   
   I completely agree, but this "empty segment" case really is an edge case 
which cannot be combined with the general case. Here's some more context 
(possibly too much detail 🙂) on why we can't use `nextTimestamp = 
<undefined/infinity>` in the empty segment case, which is what we'd need to 
maintain that `nextTimestamp` is always the largest validTo of the segment:
   
   In the implementation of the versioned store itself (PR coming soon), the 
latest record version for a particular key will be stored in a "latest value 
store," and older record versions will be stored in "segment stores" based on 
their validTo timestamps, except when the latest record version (for a 
particular key) is a tombstone. In this case, we don't want to store the 
tombstone in the latest value store because there's no expiry mechanism for the 
latest value store (and the tombstones might accumulate indefinitely). So, the 
tombstone is stored in a segment. But if these special tombstones have `validTo 
= infinity`, then this violates the invariant that "record versions are stored 
into segments based on their validTo timestamp." (We don't want to repeatedly 
move the tombstone into earlier and earlier segments as newer segments are 
created, because the whole point of putting them into a segment is so that they 
eventually expire.) Violating this invariant is a big problem for store
  efficiency. Suppose a new record version is put to the store later, long 
after a tombstone has been put (but before the tombstone has expired). In order 
to find the tombstone and update its validTo timestamp, we'd have to check 
every single segment (until we find an existing record version). We'd have to 
do this for every single put, since we wouldn't know whether the latest record 
version is a tombstone or not.
   
   In contrast, if we allow the validTo timestamp for the tombstone of the 
empty segment to be the tombstone's timestamp itself (instead of 
undefined/infinity), then we set `nextTimestamp = minTimestamp = <tombstone's 
timestamp>` for the empty segment, and we don't have the issue above anymore. 
When a new record version is put, we can safely ignore all segments earlier 
than the one corresponding to the new record's timestamp because nothing in the 
earlier segments needs to be updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to