[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093919450


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * where:
+ * 
+ * {@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,
+ * {@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and
+ * Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.
+ * 
+ * 
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of

Review Comment:
   See 
[below](https://github.com/apache/kafka/pull/13126#discussion_r1093909835).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093914257


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RocksDBVersionedStoreSegmentValueFormatterTest {
+
+private static final List TEST_CASES = new ArrayList<>();
+static {
+// test cases are expected to have timestamps in strictly decreasing 
order (except for the degenerate case)
+TEST_CASES.add(new TestCase("degenerate", 10, new TestRecord(null, 
10)));
+TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, 
new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, 
new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, 
new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new 
TestRecord(null, 2), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, 
new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new 
TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, 
new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+}
+
+private final TestCase testCase;
+
+public RocksDBVersionedStoreSegmentValueFormatterTest(final TestCase 
testCase) {
+this.testCase = testCase;
+}
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return TEST_CASES;
+}
+
+@Test
+public void shouldSerializeAndDeserialize() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertLatest(testCase);
+
+final byte[] serialized = segmentValue.serialize();
+final SegmentValue deserialized = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(serialized);
+
+verifySegmentContents(deserialized, testCase);
+}
+
+@Test
+public void shouldBuildWithInsertLatest() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertLatest(testCase);
+
+verifySegmentContents(segmentValue, testCase);
+}
+
+@Test
+public void shouldBuildWithInsertEarliest() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertEarliest(testCase);
+
+verifySegmentContents(segmentValue, testCase);
+}
+
+@Test
+public void shouldInsertAtInde

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093913489


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RocksDBVersionedStoreSegmentValueFormatterTest {
+
+private static final List TEST_CASES = new ArrayList<>();
+static {
+// test cases are expected to have timestamps in strictly decreasing 
order (except for the degenerate case)
+TEST_CASES.add(new TestCase("degenerate", 10, new TestRecord(null, 
10)));
+TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, 
new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, 
new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, 
new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new 
TestRecord(null, 2), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, 
new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new 
TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, 
new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+}
+
+private final TestCase testCase;
+
+public RocksDBVersionedStoreSegmentValueFormatterTest(final TestCase 
testCase) {
+this.testCase = testCase;
+}
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return TEST_CASES;
+}
+
+@Test
+public void shouldSerializeAndDeserialize() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertLatest(testCase);
+
+final byte[] serialized = segmentValue.serialize();
+final SegmentValue deserialized = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(serialized);
+
+verifySegmentContents(deserialized, testCase);
+}
+
+@Test
+public void shouldBuildWithInsertLatest() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertLatest(testCase);
+
+verifySegmentContents(segmentValue, testCase);
+}
+
+@Test
+public void shouldBuildWithInsertEarliest() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertEarliest(testCase);
+
+verifySegmentContents(segmentValue, testCase);
+}
+
+@Test
+public void shouldInsertAtInde

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093912650


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RocksDBVersionedStoreSegmentValueFormatterTest {
+
+private static final List TEST_CASES = new ArrayList<>();
+static {
+// test cases are expected to have timestamps in strictly decreasing 
order (except for the degenerate case)
+TEST_CASES.add(new TestCase("degenerate", 10, new TestRecord(null, 
10)));
+TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, 
new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, 
new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, 
new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new 
TestRecord(null, 2), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, 
new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new 
TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, 
new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+}
+
+private final TestCase testCase;
+
+public RocksDBVersionedStoreSegmentValueFormatterTest(final TestCase 
testCase) {
+this.testCase = testCase;
+}
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return TEST_CASES;
+}
+
+@Test
+public void shouldSerializeAndDeserialize() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertLatest(testCase);
+
+final byte[] serialized = segmentValue.serialize();
+final SegmentValue deserialized = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(serialized);
+
+verifySegmentContents(deserialized, testCase);
+}
+
+@Test
+public void shouldBuildWithInsertLatest() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertLatest(testCase);
+
+verifySegmentContents(segmentValue, testCase);
+}
+
+@Test
+public void shouldBuildWithInsertEarliest() {
+final SegmentValue segmentValue = 
buildSegmentWithInsertEarliest(testCase);
+
+verifySegmentContents(segmentValue, testCase);
+}
+
+@Test
+public void shouldInsertAtInde

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093910815


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * where:
+ * 
+ * {@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,
+ * {@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and
+ * Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.
+ * 
+ * 
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * 
+ * There is one edge case with regards to the segment value format described 
above, which is useful
+ * to know for understanding the code in this file, but not relevant for 
callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code 
next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to 
have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The 
one edge case /
+ * exception is when the latest record version (for a particular key) is a 
tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no 
record versions.
+ * This case will result in a "degenerate" segment containing the single 
tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) 
timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as 
being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a 
later timestamp will
+ * correctly return that no record version is present.) Note also that after a 
"degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as 
newer record versions
+ * are added. (For example, if additional puts happen with later timestamps 
such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain 
degenerate.)
+ * 
+ * Callers of this class need not concern themselves with this detail because 
all the exposed
+ * methods function as expected, even in the degenerate segment case. All 
methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and 
those that depend

Review Comment:
   Clarified in https://github.com/apache/kafka/pull/13186. Hopefully it's 
clear now that users of the class really don't need to care; it's only to help 
readers understand the code in this class itself.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the Lice

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093909835


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * where:
+ * 
+ * {@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,
+ * {@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and
+ * Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.
+ * 
+ * 
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * 
+ * There is one edge case with regards to the segment value format described 
above, which is useful
+ * to know for understanding the code in this file, but not relevant for 
callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code 
next_timestamp} of the

Review Comment:
   The pattern I was going for was:
   * validTo and validFrom timestamps are not styled in code markup, except 
when literally referring to code, e.g., in the javadoc for a method which 
literally takes parameters called "validFrom" and "validTo". There are very few 
of these latter cases.
   * `nextTimestamp` and `minTimestamp` are styled as code markups (they are 
code concepts only).
   
   I think all the docs are consistent with this stylization, but if that's not 
the case I can fix it. I can also change "validFrom" and "validTo" to be 
"valid-from" and "valid-to" when used as regular English, if you think that'd 
help.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * where:
+ * 
+ * {@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,
+ * {@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and
+ * Negative {@code value_size} is used to indicate t

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093907117


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * where:
+ * 
+ * {@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,
+ * {@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and
+ * Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.
+ * 
+ * 
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * 
+ * Additionally, there is one edge case / exception to the segment value 
format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the 
segment in which
+ * this tombstone is to be stored contains currently no record versions, then 
this will result in
+ * an "empty" segment -- i.e., the segment will contain only a single 
tombstone with no validTo
+ * timestamp associated with it. When this happens, the serialized segment 
will contain the
+ * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an 
empty segment, the
+ * tombstone's timestamp can be fetched as the {@code next_timestamp} of the 
segment. (An empty
+ * segment can be thought of as having {@code min_timestamp} and {@code 
next_timestamp} both equal
+ * to the timestamp of the single tombstone record version. To avoid the 
redundancy of serializing
+ * the same timestamp twice, it is only serialized once and stored as the 
first timestamp of the
+ * segment, which happens to be {@code next_timestamp}.)
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.
+ *
+ * @return the (validFrom) timestamp of the earliest record in the 
provided segment.
+ */
+static long getMinTimestamp(final byte

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093904949


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.

Review Comment:
   Hm, interesting. I see merits and demerits here. I think the name "segment 
value" makes more sense from the perspective of the store itself, because this 
data is literally what's being put into RocksDB segment store as the (RocksDB) 
value. "Row" could give the false impression of "key and value" though I think 
that's unlikely. I'm inclined to leave this as is for now, though I did updated 
the javadocs to clarify "segment row" rather than "segment" as you suggested 
above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-02-01 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093903403


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * where:
+ * 
+ * {@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,
+ * {@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and
+ * Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.
+ * 
+ * 
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * 
+ * Additionally, there is one edge case / exception to the segment value 
format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the 
segment in which

Review Comment:
   I tried to clarify what's going on in 
https://github.com/apache/kafka/pull/13186.
   
   It's not only if there's nothing to be deleted from the latest value store. 
Even if there's nothing in the latest value store because the latest value is a 
tombstone, as long as that tombstone is in the same segment as the new one 
being inserted then we also don't hit the edge case. We really only hit the 
edge case if the store is empty (or empty from the perspective of the put 
operation, i.e., whatever data is contained in the store is only present in 
older segments which are not relevant for the specific put operation). I 
spliced together your explanation and mine. Thanks for the contribution :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-27 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1089556608


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.

Review Comment:
   Hey @mjsax I just pushed an update to the serialization of "empty" segments 
(per your suggestion). The new serialization is `minTimestamp = nextTimestamp` 
with a single record version stored in the record versions list (tombstone 
value and timestamp equal to both `minTimestamp` and `nextTimestamp`). So, even 
though it's still a special case which cannot be combined with the regular case 
(as evidenced by the fact that the `insertAsLatest()` and `insertAsEarliest()` 
methods still have if-conditions to handle this case separately), now this 
special-casing is fully encapsulated within this value formatter utility class, 
i.e., the versioned store implementation itself (for which I'll open a PR as 
soon as this one is merged) does not need to concern itself with this special 
case (and therefore the public-facing `isEmpty()` methods have been removed 
accordingly). The updated javadocs in the latest commit contain more details.
   
   I've also renamed "empty" segment to "degenerate" segment, to clarify that 
the segment is not truly empty, it's just degenerate because the single 
tombstone in it has `validFrom = validTo`, which is not typically allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-23 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1084488305


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.
+ *
+ * @return the timestamp of the earliest record in the provided segment.
+ */
+static long getMinTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+}
+
+/**
+ * @return the deserialized segment value
+ */
+static SegmentValue deserialize(final byte[] segmentValue) {
+return new PartiallyDeserializedSegmentValue(segmentValue);
+}
+
+/**
+ * Creates a new segment value that contains the provided record.
+ *
+ * @param value the record value
+ * @param validFrom the record's timestamp
+ * @param validTo the record's validTo timestamp
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithRecord(
+final byte[] value, final long validFrom, final long validTo) {
+return new PartiallyDeserializedSegmentValue(value, validFrom, 
validTo);
+}
+
+/**
+ * Creates a new empty segment value.
+ *
+ * @param timestamp the timestamp of the tombstone for this empty segment 
value
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithTombstone(final long timestamp) {
+return new PartiallyDeserializedSegmentValue(timestamp);
+}
+
+interface SegmentValue {
+
+/**
+ * @return whether the segment is empty. See
+ * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} 
for details.
+ */
+boolean isEmpty();
+
+/**
+ * Finds the latest record in this segment with timestamp not 
exceeding the provided
+ * timestamp bound. This method requires that the provided timestamp 
bound exists in
+ * this segment, i.e., the segment is not empty, and the provided 
timestamp bound is
+ * at least minTimestamp and is smaller than nextTimestamp.
+ 

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-23 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1084484753


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.

Review Comment:
   That's fair. I added an extra paragraph into the top-level javadoc just now. 
LMK what you think.
   
   > In general, I prefer to have some "invariant" as it makes it simpler to 
reason about the code, but introducing this edge case void the invariant that 
`nextTimestamp` is the "largest validTo" of the segment.
   
   I completely agree, but this "empty segment" case really is an edge case 
which cannot be combined with the general case. Here's some more context 
(possibly too much detail 🙂) on why we can't use `nextTimestamp = 
` in the empty segment case, which is what we'd need to 
maintain that `nextTimestamp` is always the largest validTo of the segment:
   
   In the implementation of the versioned store itself (PR coming soon), the 
latest record version for a particular key will be stored in a "latest value 
store," and older record versions will be stored in "segment stores" based on 
their validTo timestamps, except when the latest record version (for a 
particular key) is a tombstone. In this case, we don't want to store the 
tombstone in the latest value store because there's no expiry mechanism for the 
latest value store (and the tombstones might accumulate indefinitely). So, the 
tombstone is stored in a segment. But if these special tombstones have `validTo 
= infinity`, then this violates the invariant that "record versions are stored 
into segments based on their validTo timestamp." (We don't want to repeatedly 
move the tombstone into earlier and earlier segments as newer segments are 
created, because the whole point of putting them into a segment is so that they 
eventually expire.) Violating this invariant is a big problem for store
  efficiency. Suppose a new record version is put to the store later, long 
after a tombstone has been put (but before the tombstone has expired). In order 
to find the tombstone and update its validTo timestamp, we'd have to check 
every single segment (until we find an existing record version). We'd have to 
do this for every single put, since we wouldn't know whether the latest record 
version is a tombstone or not.
   
   In contrast, if we allow the validTo timestamp for the tombstone of the 
empty segment to be the tombstone's 

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-23 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1082994727


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.

Review Comment:
   No, `nextTimestamp` is always present, even if the segment is empty. In the 
case where the segment is empty, we still need to store the tombstone's 
timestamp somewhere, and we choose to store it in `nextTimestamp` since 
`nextTimestamp` is serialized first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org