Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2024-03-14 Thread via GitHub


github-actions[bot] commented on PR #14957:
URL: https://github.com/apache/kafka/pull/14957#issuecomment-1998877709

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424277821


##
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java:
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@RunWith(Enclosed.class)
+public class ReadonlyPartiallyDeserializedSegmentValueTest {
+
+/**
+ * Non-exceptional scenarios which are expected to occur during regular 
store operation.
+ */
+@RunWith(Parameterized.class)
+public static class ExpectedCasesTest {
+
+private static final List TEST_CASES = new ArrayList<>();
+
+static {
+// test cases are expected to have timestamps in strictly 
decreasing order (except for the degenerate case)
+TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 
10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 
10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 
10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), 
new TestRecord(null, 2), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 
10, new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, 
new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 
10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+}
+
+private final TestCase testCase;
+
+public ExpectedCasesTest(final TestCase testCase) {
+this.testCase = testCase;
+}
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return TEST_CASES;
+}
+
+@Test
+public void shouldFindInTimeRangesWithDifferentOrders() {
+
+// create a list of timestamps in ascending order to use them in 
combination for starting and ending point of the time range.
+final List timestamps = 
createTimestampsFromTestRecords(testCase);
+
+// verify results
+final List orders = 
Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING);
+for (final ResultOrder order: orders) {
+for (final Long from : timestamps) {
+for 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -69,23 +73,49 @@ public boolean hasNext() {
 if (!open) {
 throw new IllegalStateException("The iterator is out of scope.");
 }
-// since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-return hasStillLoad || maybeFillIterator();
+if (this.next != null) {
+return true;
+}
+
+while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+if (!hasSegmentValue) {
+hasSegmentValue = maybeFillCurrentSegmentValue();
+}
+if (hasSegmentValue) {
+this.next  = getNextRecord();
+if (this.next == null) {
+prepareToFetchNextSegment();
+}
+}
+}
+return this.next != null;
 }
 
 @Override
-public Object next() {
-if (hasNext()) {
-// since data is stored in descending order in the segments, 
retrieve previous record, if the order is Ascending.
-return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+public VersionedRecord next() {
+if (this.next == null) {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
 }
-throw new NoSuchElementException();
+assert this.next != null;

Review Comment:
   > We usually don't use `assert`. Can be removed.
   
   Otherwise, there will be a warning, and we have to suppress it. But I will 
remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -69,23 +73,49 @@ public boolean hasNext() {
 if (!open) {
 throw new IllegalStateException("The iterator is out of scope.");
 }
-// since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-return hasStillLoad || maybeFillIterator();
+if (this.next != null) {
+return true;
+}
+
+while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+if (!hasSegmentValue) {
+hasSegmentValue = maybeFillCurrentSegmentValue();
+}
+if (hasSegmentValue) {
+this.next  = getNextRecord();
+if (this.next == null) {
+prepareToFetchNextSegment();
+}
+}
+}
+return this.next != null;
 }
 
 @Override
-public Object next() {
-if (hasNext()) {
-// since data is stored in descending order in the segments, 
retrieve previous record, if the order is Ascending.
-return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+public VersionedRecord next() {
+if (this.next == null) {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
 }
-throw new NoSuchElementException();
+assert this.next != null;

Review Comment:
   > We usually don't use `assert`. Can be removed.
   
   Otherwise, there will be a warning, and we have to suppress it. But I will 
remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424245975


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);
+int currValueSize;
+
+
+while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+if (hasBeenDeserialized(isAscending, currIndex)) {
+final TimestampAndValueSize curr;
+curr = unpackedTimestampAndValueSizes.get(currIndex);
+currTimestamp = curr.timestamp;
+cumValueSize = cumulativeValueSizes.get(currIndex);
+currValueSize = curr.valueSize;
+
+// update currValueSize
+if (currValueSize == Integer.MIN_VALUE) {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));

Review Comment:
   > `cumValueSize` is not updated with `currValueSize`. Why?
   
   Yes, in fact in this if (if deserIndex is ahead of currIndex), we must have 
EVERYTHING ready. Therefore, `if (currValueSize == Integer.MIN_VALUE)` does not 
fit here. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424136033


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);

Review Comment:
   > Not sure why we need to init `cumValueSize`? It seem below we never read 
the value set here?
   
   We must do that. Look at line#106 (`cumValueSize += Math.max(currValueSize, 
0);`). 
   In fact, when we have everything ready in the cache 
(`unpackedTimestampAndValueSizes`), we do not need it. Otherwise, we need the 
former computed cumValueSize to be able to move to the correct position in the 
segment and deserialize the next one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424208309


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);

Review Comment:
   line#100 is `unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));` in this PR. I only see 
`cumValueSize += Math.max(currValueSize, 0);` in line#112 (but in L112, 
`currValueSize` was already overwritten in L108)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424208309


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);

Review Comment:
   line#100 is `unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));` in this PR. I only see 
`cumValueSize += Math.max(currValueSize, 0);` in line#112



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424206051


##
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java:
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@RunWith(Enclosed.class)
+public class ReadonlyPartiallyDeserializedSegmentValueTest {
+
+/**
+ * Non-exceptional scenarios which are expected to occur during regular 
store operation.
+ */
+@RunWith(Parameterized.class)
+public static class ExpectedCasesTest {
+
+private static final List TEST_CASES = new ArrayList<>();
+
+static {
+// test cases are expected to have timestamps in strictly 
decreasing order (except for the degenerate case)
+TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 
10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 
10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 
10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), 
new TestRecord(null, 2), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 
10, new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, 
new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 
10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+}
+
+private final TestCase testCase;
+
+public ExpectedCasesTest(final TestCase testCase) {
+this.testCase = testCase;
+}
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return TEST_CASES;
+}
+
+@Test
+public void shouldFindInTimeRangesWithDifferentOrders() {
+
+// create a list of timestamps in ascending order to use them in 
combination for starting and ending point of the time range.
+final List timestamps = 
createTimestampsFromTestRecords(testCase);
+
+// verify results
+final List orders = 
Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING);
+for (final ResultOrder order: orders) {
+for (final Long from : timestamps) {
+for (final Long 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424138349


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {

Review Comment:
   > Do we need this method? Cannot find where it's used (except test code)
   
   The test code is also important. We may have future usage as well. What's 
the problem with having it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424136033


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);

Review Comment:
   > Not sure why we need to init `cumValueSize`? It seem below we never read 
the value set here?
   
   We must do that. Look at line#100 (`cumValueSize += Math.max(currValueSize, 
0);`). 
   In fact, when we have everything ready in the cache 
(`unpackedTimestampAndValueSizes`), we do not need it. Otherwise, we need the 
former computed cumValueSize to be able to move to the correct position in the 
segment and deserialize the next one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424138349


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {

Review Comment:
   > Do we need this method? Cannot find where it's used (except test code)
   
   The test code is also important. We may future usage as well. What 's the 
problem of having it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424055579


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -100,34 +130,48 @@ private boolean maybeFillIterator() {
 final byte[] rawSegmentValue = segment.get(key, snapshot);
 if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
 if (segment.id() == -1) { // this is the latestValueStore
-final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-if (recordTimestamp <= toTime) {
-// latest value satisfies timestamp bound
-queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-}
+this.currentRawSegmentValue = rawSegmentValue;
 } else {
-// this segment contains records with the specified key 
and time range
-final 
List
 searchResults =
-
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-}
+this.currentDeserializedSegmentValue = new 
ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue);
 }
-}
-if (!queryResults.isEmpty()) {
-break;
+return true;
 }
 }
-if (!queryResults.isEmpty()) {
-// since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-return true;
-}
 // if all segments have been processed, release the snapshot
 releaseSnapshot();
 return false;
 }
 
+private VersionedRecord getNextRecord() {
+VersionedRecord nextRecord = null;
+if (currentRawSegmentValue != null) { // this is the latestValueStore
+final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+if (recordTimestamp <= toTime) {
+final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+// latest value satisfies timestamp bound
+nextRecord = new VersionedRecord<>(value, recordTimestamp);
+}
+} else {
+final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+currentDeserializedSegmentValue.find(fromTime, toTime, 
order, nextIndex);
+if (currentRecord != null) {
+nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+this.nextIndex = order.equals(ResultOrder.ASCENDING) ? 
currentRecord.index() - 1 : currentRecord.index() + 1;
+}
+}
+// no relevant record can be found in the segment
+if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
nextRecord.validTo().get())) {

Review Comment:
   > To make sure I understand this correctly:
   > 
   > * If `nextRecord == null` it means we did not find anything and want to go 
to the next segment (make sense)
   > * If `currentRawSegmentValue != null` it does not matter if we filled 
`nextRecord` or not, because the "latest" segment contains at max one record, 
and thus we need to step into the next segment in any case?
   > * Similar for `canSegmentHaveMoreRelevantRecords` we check if we exceeded 
the current segment?
   
   Yes. Exactly.
   When `canSegmentHaveMoreRelevantRecords` returns false, it means that this 
segment has no more records that fit in query time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1423917600


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -100,34 +130,48 @@ private boolean maybeFillIterator() {
 final byte[] rawSegmentValue = segment.get(key, snapshot);
 if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
 if (segment.id() == -1) { // this is the latestValueStore
-final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-if (recordTimestamp <= toTime) {
-// latest value satisfies timestamp bound
-queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-}
+this.currentRawSegmentValue = rawSegmentValue;
 } else {
-// this segment contains records with the specified key 
and time range
-final 
List
 searchResults =
-
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-}
+this.currentDeserializedSegmentValue = new 
ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue);
 }
-}
-if (!queryResults.isEmpty()) {
-break;
+return true;
 }
 }
-if (!queryResults.isEmpty()) {
-// since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-return true;
-}
 // if all segments have been processed, release the snapshot
 releaseSnapshot();
 return false;
 }
 
+private VersionedRecord getNextRecord() {
+VersionedRecord nextRecord = null;
+if (currentRawSegmentValue != null) { // this is the latestValueStore
+final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+if (recordTimestamp <= toTime) {
+final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+// latest value satisfies timestamp bound
+nextRecord = new VersionedRecord<>(value, recordTimestamp);
+}
+} else {
+final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+currentDeserializedSegmentValue.find(fromTime, toTime, 
order, nextIndex);
+if (currentRecord != null) {
+nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+this.nextIndex = order.equals(ResultOrder.ASCENDING) ? 
currentRecord.index() - 1 : currentRecord.index() + 1;
+}
+}
+// no relevant record can be found in the segment
+if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
nextRecord.validTo().get())) {
+prepareToFetchNextSegment();
+}
+return nextRecord;
+}
+
+private boolean canSegmentHaveMoreRelevantRecords(final long 
currentValidFrom, final long currentValidTo) {
+final boolean isCurrentOutsideTimeRange = 
(order.equals(ResultOrder.ASCENDING) && (currentValidTo > toTime || 
currentDeserializedSegmentValue.nextTimestamp() == currentValidTo))

Review Comment:
   This is complex. Might be simpler to use:
   ```
   final boolean isCurrentOutsideTimeRange;
   if (order.equals(ResultOrder.ASCENDING) {
   isCurrentOutsideTimeRange = currentValidTo > toTime || 
currentDeserializedSegmentValue.nextTimestamp() == currentValidTo);
   } else {
   isCurrentOutsideTimeRange = currentValidFrom < fromTime || 
currentDeserializedSegmentValue.minTimestamp() == currentValidFrom);
   }



##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-11 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1422488101


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long getMinTimestamp() {
+return minTimestamp;
+}
+
+public long getNextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumvalueSize(index, currIndex, 
isAscending);
+int currValueSize;
+
+
+while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+if (hasBeenDeserialized(isAscending, currIndex)) {
+final TimestampAndValueSize curr;
+curr = unpackedTimestampAndValueSizes.get(currIndex);
+currTimestamp = curr.timestamp;
+cumValueSize = cumulativeValueSizes.get(currIndex);
+currValueSize = curr.valueSize;
+
+// update currValueSize
+if (currValueSize == Integer.MIN_VALUE) {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));
+}
+
+currNextTimestamp = updateCurrNextTimestamp(currIndex, 
isAscending);
+
+} else {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
+currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE
+? nextTimestamp // if this is the first record 
metadata (timestamp + value size)
+: 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-11 Thread via GitHub


mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1422442823


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long getMinTimestamp() {
+return minTimestamp;
+}
+
+public long getNextTimestamp() {

Review Comment:
   same, no `get`



##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long getMinTimestamp() {

Review Comment:
   nit: naming convention, don't use `get` -> `minTimestamp()` 



##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,206 @@
+/*
+ * 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1421149432


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
 }
 
 @Override
-public List findAll(final long fromTime, final 
long toTime) {
-long currNextTimestamp = nextTimestamp;
-final List segmentSearchResults = new 
ArrayList<>();
-long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-int currValueSize;
-int currIndex = 0;
-int cumValueSize = 0;
-while (currTimestamp != minTimestamp) {
-final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+// this segment does not have any record in query specified time 
range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+
+
+if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+findValuesStartingIndex();
+}
+
+while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-cumValueSize += Math.max(currValueSize, 0);
+currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+   
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
 if (currValueSize >= 0) {
 final byte[] value = new byte[currValueSize];
-final int valueSegmentIndex = segmentValue.length - 
cumValueSize;
+final int valueSegmentIndex = getValueSegmentIndex(order, 
currentCumValueSize, currValueSize);
 System.arraycopy(segmentValue, valueSegmentIndex, value, 
0, currValueSize);
 if (currTimestamp <= toTime && currNextTimestamp > 
fromTime) {
-segmentSearchResults.add(new 
SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+currentCumValueSize += currValueSize;
+currentDeserIndex++;
+return new SegmentSearchResult(currentDeserIndex - 1, 
currTimestamp, currNextTimestamp, value);
 }
 }
-
 // prep for next iteration
-currNextTimestamp = currTimestamp;
+currentCumValueSize += Math.max(currValueSize, 0);
+currentDeserIndex++;
+}
+// search in segment expected to find result but did not
+return null;
+}
+
+private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+}
+
+private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
currentCumValueSize
+   : segmentValue.length - 
(currentCumValueSize + currValueSize);
+}
+
+private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE))
+   : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+}
+
+private void findValuesStartingIndex() {
+long currTimestamp = -1;
+int currIndex = 0;
+int timestampSegmentIndex = 0;
+while (currTimestamp != minTimestamp) {
+timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * 
(TIMESTAMP_SIZE + VALUE_SIZE);
+currTimestamp = 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on PR #14957:
URL: https://github.com/apache/kafka/pull/14957#issuecomment-1848005381

   @mjsax 
   I modified the `find()` method in the best possible way reusing the 
currently defined `deserIndex` and caches. Here is a brief list of changes and 
issues.
   1. I had to define the `reversedDeserIndex` as well as two other caches for 
deserialisation in ascending order of timestamps
   2. I had to add a new parameter `index` to the `find()` method since we are 
iterating over the segment and just relying on `deserIndex` is not safe since 
it may be modified by the other `find()` or be rest with methods such as 
`initializeWithRecord()`
   3. Look at 
[this](https://github.com/aliehsaeedii/kafka/blob/350167e2693db44e5b0357ee8527273c2452eb02/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java#L387)
 comment please:| I must confess that there is no way to fix this issue.
   
   Conclusion:
   1. I believe that a single instance of `PartiallyDeserializedSegmentValue` 
is not able to call the two `find()` methods (or other class methods) since my 
`find()` is only accessible through the `LogicalSegmentIterator` class. Of 
course, we can call them interleaved in 
`RocksDBVersionedStoreSegmentValueFormatterTest` class. But it is a fake manual 
thing. Please tell me if I am wrong. If this conclusion is correct, defining 
the new `find()` method in `PartiallyDeserializedSegmentValue` class is 
meaningless. Since it anyway is not able to reuse the common deserHelpers. We 
must move it to the `SegmentIterator` class which makes the implementation 
cleaner and easier.
   
   2. If my former conclusion is not correct, then either the new `find()` must 
have its own isolated `deserHelpers` (like my former implementation) or find a 
safe way to reuse the currently available ones (which I do not know yet).
   
   P.S.: This is not a final commit. Committed just for more clarification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420974212


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
 }
 
 @Override
-public List findAll(final long fromTime, final 
long toTime) {
-long currNextTimestamp = nextTimestamp;
-final List segmentSearchResults = new 
ArrayList<>();
-long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-int currValueSize;
-int currIndex = 0;
-int cumValueSize = 0;
-while (currTimestamp != minTimestamp) {
-final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+// this segment does not have any record in query specified time 
range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+
+
+if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+findValuesStartingIndex();
+}
+
+while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-cumValueSize += Math.max(currValueSize, 0);
+currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+   
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
 if (currValueSize >= 0) {
 final byte[] value = new byte[currValueSize];
-final int valueSegmentIndex = segmentValue.length - 
cumValueSize;
+final int valueSegmentIndex = getValueSegmentIndex(order, 
currentCumValueSize, currValueSize);
 System.arraycopy(segmentValue, valueSegmentIndex, value, 
0, currValueSize);
 if (currTimestamp <= toTime && currNextTimestamp > 
fromTime) {
-segmentSearchResults.add(new 
SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+currentCumValueSize += currValueSize;
+currentDeserIndex++;
+return new SegmentSearchResult(currentDeserIndex - 1, 
currTimestamp, currNextTimestamp, value);
 }
 }
-
 // prep for next iteration
-currNextTimestamp = currTimestamp;
+currentCumValueSize += Math.max(currValueSize, 0);
+currentDeserIndex++;
+}
+// search in segment expected to find result but did not
+return null;
+}
+
+private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+}
+
+private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
currentCumValueSize
+   : segmentValue.length - 
(currentCumValueSize + currValueSize);
+}
+
+private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE))
+   : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+}
+
+private void findValuesStartingIndex() {
+long currTimestamp = -1;
+int currIndex = 0;
+int timestampSegmentIndex = 0;
+while (currTimestamp != minTimestamp) {
+timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * 
(TIMESTAMP_SIZE + VALUE_SIZE);
+currTimestamp = 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420648455


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
 }
 
 @Override
-public List findAll(final long fromTime, final 
long toTime) {
-long currNextTimestamp = nextTimestamp;
-final List segmentSearchResults = new 
ArrayList<>();
-long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-int currValueSize;
-int currIndex = 0;
-int cumValueSize = 0;
-while (currTimestamp != minTimestamp) {
-final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+// this segment does not have any record in query specified time 
range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+
+
+if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+findValuesStartingIndex();
+}
+
+while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-cumValueSize += Math.max(currValueSize, 0);
+currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+   
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
 if (currValueSize >= 0) {
 final byte[] value = new byte[currValueSize];
-final int valueSegmentIndex = segmentValue.length - 
cumValueSize;
+final int valueSegmentIndex = getValueSegmentIndex(order, 
currentCumValueSize, currValueSize);
 System.arraycopy(segmentValue, valueSegmentIndex, value, 
0, currValueSize);
 if (currTimestamp <= toTime && currNextTimestamp > 
fromTime) {
-segmentSearchResults.add(new 
SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+currentCumValueSize += currValueSize;
+currentDeserIndex++;
+return new SegmentSearchResult(currentDeserIndex - 1, 
currTimestamp, currNextTimestamp, value);
 }
 }
-
 // prep for next iteration
-currNextTimestamp = currTimestamp;
+currentCumValueSize += Math.max(currValueSize, 0);
+currentDeserIndex++;
+}
+// search in segment expected to find result but did not
+return null;
+}
+
+private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+}
+
+private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
currentCumValueSize
+   : segmentValue.length - 
(currentCumValueSize + currValueSize);
+}
+
+private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE))
+   : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+}
+
+private void findValuesStartingIndex() {
+long currTimestamp = -1;
+int currIndex = 0;
+int timestampSegmentIndex = 0;
+while (currTimestamp != minTimestamp) {
+timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * 
(TIMESTAMP_SIZE + VALUE_SIZE);
+currTimestamp = 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420641426


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
 }
 
 @Override
-public List findAll(final long fromTime, final 
long toTime) {
-long currNextTimestamp = nextTimestamp;
-final List segmentSearchResults = new 
ArrayList<>();
-long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-int currValueSize;
-int currIndex = 0;
-int cumValueSize = 0;
-while (currTimestamp != minTimestamp) {
-final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+// this segment does not have any record in query specified time 
range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+
+
+if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+findValuesStartingIndex();
+}
+
+while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-cumValueSize += Math.max(currValueSize, 0);
+currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+   
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
 if (currValueSize >= 0) {
 final byte[] value = new byte[currValueSize];
-final int valueSegmentIndex = segmentValue.length - 
cumValueSize;
+final int valueSegmentIndex = getValueSegmentIndex(order, 
currentCumValueSize, currValueSize);
 System.arraycopy(segmentValue, valueSegmentIndex, value, 
0, currValueSize);
 if (currTimestamp <= toTime && currNextTimestamp > 
fromTime) {
-segmentSearchResults.add(new 
SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+currentCumValueSize += currValueSize;
+currentDeserIndex++;
+return new SegmentSearchResult(currentDeserIndex - 1, 
currTimestamp, currNextTimestamp, value);
 }
 }
-
 // prep for next iteration
-currNextTimestamp = currTimestamp;
+currentCumValueSize += Math.max(currValueSize, 0);
+currentDeserIndex++;
+}
+// search in segment expected to find result but did not
+return null;
+}
+
+private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+}
+
+private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
currentCumValueSize
+   : segmentValue.length - 
(currentCumValueSize + currValueSize);
+}
+
+private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE))
+   : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+}
+
+private void findValuesStartingIndex() {
+long currTimestamp = -1;
+int currIndex = 0;
+int timestampSegmentIndex = 0;
+while (currTimestamp != minTimestamp) {
+timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * 
(TIMESTAMP_SIZE + VALUE_SIZE);
+currTimestamp = 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420471301


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -150,7 +152,15 @@ interface SegmentValue {
  */
 SegmentSearchResult find(long timestamp, boolean includeValue);
 
-List findAll(long fromTime, long toTime);
+/**
+ * Finds the next/previous record in this segment row that is valid 
within the query specified time range
+ *
+ * @param fromTime the query starting time point
+ * @param toTime the query ending time point
+ * @param order specifies the order (based on timestamp) in which 
records are returned
+ * @return the record that is found, null if no record is found
+ */
+SegmentSearchResult find(long fromTime, long toTime, ResultOrder 
order);

Review Comment:
   > I am wondering if existing `find()` and the new `find()` can be called 
interleaved without breaking anything?
   
   When does that happen? Right now, only instances of `LogicalSegmentIterator` 
have access to the new `find` method; therefore, I think the same instance of 
`PartiallyDeserializedSetmentValue` cannot call them interleaved. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420466908


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -150,7 +152,15 @@ interface SegmentValue {
  */
 SegmentSearchResult find(long timestamp, boolean includeValue);
 
-List findAll(long fromTime, long toTime);
+/**
+ * Finds the next/previous record in this segment row that is valid 
within the query specified time range
+ *
+ * @param fromTime the query starting time point
+ * @param toTime the query ending time point
+ * @param order specifies the order (based on timestamp) in which 
records are returned
+ * @return the record that is found, null if no record is found
+ */
+SegmentSearchResult find(long fromTime, long toTime, ResultOrder 
order);

Review Comment:
   > I am wondering if existing `find()` and the new `find()` can be called 
interleaved without breaking anything?
   > 
   > So far, `PartiallyDeserializedSetmentValue` is already lazy and 
deserializes from newest to oldest. It would be a difficult API contract if 
calling existing `find` and new `find` interleaved would break anything?
   
   A very good question. I think we need a sync session about that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420462098


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue 
implements SegmentValue {
 private int deserIndex = -1; // index up through which this segment 
has been deserialized (inclusive)
 private List 
unpackedReversedTimestampAndValueSizes;
 private List cumulativeValueSizes; // ordered same as 
timestamp and value sizes (reverse time-sorted)
+private int valuesStartingIndex = -1; // the index of first value in 
the segment
+private int currentCumValueSize = 0; // this is for deserializing a 
segment records in a lazy manner
+private int currentDeserIndex = 0; // this is for deserializing a 
segment records in a lazy manner

Review Comment:
   > Why do we need to duplicate both variables?
   
   This is what I did in the first place. But it is complicated since other 
methods, such as `doInsert()` and other methods, are manipulating them (I mean 
`deserIndex` and `cumulativeValueSizes`) as well. That's why I decided to have 
my own variables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420464501


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue 
implements SegmentValue {
 private int deserIndex = -1; // index up through which this segment 
has been deserialized (inclusive)
 private List 
unpackedReversedTimestampAndValueSizes;
 private List cumulativeValueSizes; // ordered same as 
timestamp and value sizes (reverse time-sorted)
+private int valuesStartingIndex = -1; // the index of first value in 
the segment
+private int currentCumValueSize = 0; // this is for deserializing a 
segment records in a lazy manner
+private int currentDeserIndex = 0; // this is for deserializing a 
segment records in a lazy manner

Review Comment:
   > it seems we might want to change their name to indicate when (ascending vs 
descending) they are used?
   
   I use the same variable (my own defined one) for both directions. Since 
deserialization is either backwards or forward and not both at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420462098


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue 
implements SegmentValue {
 private int deserIndex = -1; // index up through which this segment 
has been deserialized (inclusive)
 private List 
unpackedReversedTimestampAndValueSizes;
 private List cumulativeValueSizes; // ordered same as 
timestamp and value sizes (reverse time-sorted)
+private int valuesStartingIndex = -1; // the index of first value in 
the segment
+private int currentCumValueSize = 0; // this is for deserializing a 
segment records in a lazy manner
+private int currentDeserIndex = 0; // this is for deserializing a 
segment records in a lazy manner

Review Comment:
   > Why do we need to duplicate both variables?
   
   This is what I did in the first place. But it is complicated since other 
methods, such as `doInsert()` and other methods, are manipulating (`deserIndex` 
and `cumulativeValueSizes`) them as well. That's why I decided to have my own 
variables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420452896


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -100,34 +134,49 @@ private boolean maybeFillIterator() {
 final byte[] rawSegmentValue = segment.get(key, snapshot);
 if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
 if (segment.id() == -1) { // this is the latestValueStore
-final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-if (recordTimestamp <= toTime) {
-// latest value satisfies timestamp bound
-queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-}
+this.currentRawSegmentValue = rawSegmentValue;
 } else {
-// this segment contains records with the specified key 
and time range
-final 
List
 searchResults =
-
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-}
+this.currentDeserializedSegmentValue = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
+this.minTimestamp = 
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue);
+this.nextTimestamp = 
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
 }
-}
-if (!queryResults.isEmpty()) {
-break;
+return true;
 }
 }
-if (!queryResults.isEmpty()) {
-// since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-return true;
-}
 // if all segments have been processed, release the snapshot
 releaseSnapshot();
 return false;
 }
 
+private Object getNextRecord() {
+VersionedRecord nextRecord = null;
+if (currentRawSegmentValue != null) { // this is the latestValueStore
+final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+if (recordTimestamp <= toTime) {
+final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+// latest value satisfies timestamp bound
+nextRecord = new VersionedRecord<>(value, recordTimestamp);
+}
+} else {
+final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+currentDeserializedSegmentValue.find(fromTime, toTime, 
order);
+if (currentRecord != null) {
+nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+}
+}
+// no relevant record can be found in the segment
+if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
Long.parseLong(nextRecord.validTo().get().toString( {

Review Comment:
   > `Long.parseLong(nextRecord.validTo().get().toString())` -- why to we get 
valid-to as `String` and pipe though `parseLong`? Why can't we just pass 
`nextRecord.validTo().get()` into `canSegmentHaveMoreRelevantRecords(...)`?
   
   I had to do that. Fixing generic type, fixed all these issues now :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-08 Thread via GitHub


mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420058690


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -69,23 +76,50 @@ public boolean hasNext() {
 if (!open) {
 throw new IllegalStateException("The iterator is out of scope.");
 }
-// since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-return hasStillLoad || maybeFillIterator();
+if (this.next != null) {
+return true;
+}
+
+while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+if (!hasSegmentValue) {
+hasSegmentValue = maybeFillCurrentSegmentValue();
+}
+if (hasSegmentValue) {
+this.next  = (VersionedRecord) getNextRecord();
+if (this.next == null) {
+prepareToFetchNextSegment();
+}
+}
+}
+return this.next != null;
 }
 
+@SuppressWarnings("unchecked")

Review Comment:
   We can avoid this suppression by fixing generic types. This class should be 
defined as
   ```
   public class LogicalSegmentIterator implements 
VersionedRecordIterator {
   ```
   (ie, add the return type `byte[]`)
   
   This allows us to change the return type of `next()`:
   ```
   public VersionedRecord next() {
   ```
   what make the suppression unnecessary and we can also use fully types 
`VersionedRecord` throughput the code base of this class (instead of 
untype `VersionRecord`, and we can avoid casts)



##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -69,23 +76,50 @@ public boolean hasNext() {
 if (!open) {
 throw new IllegalStateException("The iterator is out of scope.");
 }
-// since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-return hasStillLoad || maybeFillIterator();
+if (this.next != null) {
+return true;
+}
+
+while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+if (!hasSegmentValue) {
+hasSegmentValue = maybeFillCurrentSegmentValue();
+}
+if (hasSegmentValue) {
+this.next  = (VersionedRecord) getNextRecord();

Review Comment:
   Remove cast (should not be necessary... cf other comments)



##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -100,34 +134,49 @@ private boolean maybeFillIterator() {
 final byte[] rawSegmentValue = segment.get(key, snapshot);
 if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
 if (segment.id() == -1) { // this is the latestValueStore
-final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-if (recordTimestamp <= toTime) {
-// latest value satisfies timestamp bound
-queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-}
+this.currentRawSegmentValue = rawSegmentValue;
 } else {
-// this segment contains records with the specified key 
and time range
-final 
List
 searchResults =
-
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-}
+this.currentDeserializedSegmentValue = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
+this.minTimestamp = 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-07 Thread via GitHub


mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420051909


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -34,7 +31,16 @@ public class LogicalSegmentIterator implements 
VersionedRecordIterator {
 private final Long fromTime;
 private final Long toTime;
 private final ResultOrder order;
-private ListIterator> iterator;
+// stores the raw value of the latestValueStore when latestValueStore is 
the current segment
+private byte[] currentRawSegmentValue;
+// stores the deserialized value of the current segment (when current 
segment is one of the old segments)
+private RocksDBVersionedStoreSegmentValueFormatter.SegmentValue 
currentDeserializedSegmentValue;
+// current segment minTimestamp (when current segment is not the 
latestValueStore)
+private long minTimestamp;
+// current segment nextTimestamp (when current segment is not the 
latestValueStore)
+private long nextTimestamp;
+private VersionedRecord next;
+// current deserialization index

Review Comment:
   Wrong line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org