vcrfxia commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1084484753
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * <pre> + * <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp> + * </pre> + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { + private static final int TIMESTAMP_SIZE = 8; + private static final int VALUE_SIZE = 4; + + /** + * @return the validTo timestamp of the latest record in the provided segment + */ + static long getNextTimestamp(final byte[] segmentValue) { + return ByteBuffer.wrap(segmentValue).getLong(0); + } + + /** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * <p> + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. Review Comment: That's fair. I added an extra paragraph into the top-level javadoc just now. LMK what you think. > In general, I prefer to have some "invariant" as it makes it simpler to reason about the code, but introducing this edge case void the invariant that `nextTimestamp` is the "largest validTo" of the segment. I completely agree, but this "empty segment" case really is an edge case which cannot be combined with the general case. Here's some more context (possibly too much detail 🙂) on why we can't use `nextTimestamp = <undefined/infinity>` in the empty segment case, which is what we'd need to maintain that `nextTimestamp` is always the largest validTo of the segment: In the implementation of the versioned store itself (PR coming soon), the latest record version for a particular key will be stored in a "latest value store," and older record versions will be stored in "segment stores" based on their validTo timestamps, except when the latest record version (for a particular key) is a tombstone. In this case, we don't want to store the tombstone in the latest value store because there's no expiry mechanism for the latest value store (and the tombstones might accumulate indefinitely). So, the tombstone is stored in a segment. But if these special tombstones have `validTo = infinity`, then this violates the invariant that "record versions are stored into segments based on their validTo timestamp." (We don't want to repeatedly move the tombstone into earlier and earlier segments as newer segments are created, because the whole point of putting them into a segment is so that they eventually expire.) Violating this invariant is a big problem for store efficiency. Suppose a new record version is put to the store later, long after a tombstone has been put (but before the tombstone has expired). In order to find the tombstone and update its validTo timestamp, we'd have to check every single segment (until we find an existing record version). We'd have to do this for every single put, since we wouldn't know whether the latest record version is a tombstone or not. In contrast, if we allow the validTo timestamp for the tombstone of the empty segment to be the tombstone's timestamp itself (instead of undefined/infinity), then we set `nextTimestamp = minTimestamp = <tombstone's timestamp>` for the empty segment, and we don't have the issue above anymore. When a new record version is put, we can safely ignore all segments earlier than the one corresponding to the new record's timestamp because nothing in the earlier segments needs to be updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org