Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
github-actions[bot] commented on PR #14957: URL: https://github.com/apache/kafka/pull/14957#issuecomment-1998877709 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424277821 ## streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java: ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; + +@RunWith(Enclosed.class) +public class ReadonlyPartiallyDeserializedSegmentValueTest { + +/** + * Non-exceptional scenarios which are expected to occur during regular store operation. + */ +@RunWith(Parameterized.class) +public static class ExpectedCasesTest { + +private static final List TEST_CASES = new ArrayList<>(); + +static { +// test cases are expected to have timestamps in strictly decreasing order (except for the degenerate case) +TEST_CASES.add(new TestCase("single record", 10, new TestRecord("foo".getBytes(), 1))); +TEST_CASES.add(new TestCase("multiple records", 10, new TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new TestRecord("baz".getBytes(), 0))); +TEST_CASES.add(new TestCase("single tombstone", 10, new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("multiple tombstone", 10, new TestRecord(null, 4), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new TestRecord("bar".getBytes(), 1))); +TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new TestRecord(null, 2), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, new TestRecord(null, 7), new TestRecord(null, 6), new TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1))); +TEST_CASES.add(new TestCase("record with empty bytes", 10, new TestRecord(new byte[0], 1))); +TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1))); +TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new TestRecord("foo".getBytes(), 1))); +} + +private final TestCase testCase; + +public ExpectedCasesTest(final TestCase testCase) { +this.testCase = testCase; +} + +@Parameterized.Parameters(name = "{0}") +public static Collection data() { +return TEST_CASES; +} + +@Test +public void shouldFindInTimeRangesWithDifferentOrders() { + +// create a list of timestamps in ascending order to use them in combination for starting and ending point of the time range. +final List timestamps = createTimestampsFromTestRecords(testCase); + +// verify results +final List orders = Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING); +for (final ResultOrder order: orders) { +for (final Long from : timestamps) { +for
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -69,23 +73,49 @@ public boolean hasNext() { if (!open) { throw new IllegalStateException("The iterator is out of scope."); } -// since data is stored in descending order in the segments, check whether there is any previous record, if the order is Ascending. -final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); -return hasStillLoad || maybeFillIterator(); +if (this.next != null) { +return true; +} + +while ((currentDeserializedSegmentValue != null || currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == null) { +boolean hasSegmentValue = currentDeserializedSegmentValue != null || currentRawSegmentValue != null; +if (!hasSegmentValue) { +hasSegmentValue = maybeFillCurrentSegmentValue(); +} +if (hasSegmentValue) { +this.next = getNextRecord(); +if (this.next == null) { +prepareToFetchNextSegment(); +} +} +} +return this.next != null; } @Override -public Object next() { -if (hasNext()) { -// since data is stored in descending order in the segments, retrieve previous record, if the order is Ascending. -return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); +public VersionedRecord next() { +if (this.next == null) { +if (!hasNext()) { +throw new NoSuchElementException(); +} } -throw new NoSuchElementException(); +assert this.next != null; Review Comment: > We usually don't use `assert`. Can be removed. Otherwise, there will be a warning, and we have to suppress it. But I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -69,23 +73,49 @@ public boolean hasNext() { if (!open) { throw new IllegalStateException("The iterator is out of scope."); } -// since data is stored in descending order in the segments, check whether there is any previous record, if the order is Ascending. -final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); -return hasStillLoad || maybeFillIterator(); +if (this.next != null) { +return true; +} + +while ((currentDeserializedSegmentValue != null || currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == null) { +boolean hasSegmentValue = currentDeserializedSegmentValue != null || currentRawSegmentValue != null; +if (!hasSegmentValue) { +hasSegmentValue = maybeFillCurrentSegmentValue(); +} +if (hasSegmentValue) { +this.next = getNextRecord(); +if (this.next == null) { +prepareToFetchNextSegment(); +} +} +} +return this.next != null; } @Override -public Object next() { -if (hasNext()) { -// since data is stored in descending order in the segments, retrieve previous record, if the order is Ascending. -return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); +public VersionedRecord next() { +if (this.next == null) { +if (!hasNext()) { +throw new NoSuchElementException(); +} } -throw new NoSuchElementException(); +assert this.next != null; Review Comment: > We usually don't use `assert`. Can be removed. Otherwise, there will be a warning, and we have to suppress it. But I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424245975 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumValueSize(index, currIndex, isAscending); +int currValueSize; + + +while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { +if (hasBeenDeserialized(isAscending, currIndex)) { +final TimestampAndValueSize curr; +curr = unpackedTimestampAndValueSizes.get(currIndex); +currTimestamp = curr.timestamp; +cumValueSize = cumulativeValueSizes.get(currIndex); +currValueSize = curr.valueSize; + +// update currValueSize +if (currValueSize == Integer.MIN_VALUE) { +final int timestampSegmentIndex = getTimestampIndex(order, currIndex); +currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); +unpackedTimestampAndValueSizes.put(currIndex, new TimestampAndValueSize(currTimestamp, cumValueSize)); Review Comment: > `cumValueSize` is not updated with `currValueSize`. Why? Yes, in fact in this if (if deserIndex is ahead of currIndex), we must have EVERYTHING ready. Therefore, `if (currValueSize == Integer.MIN_VALUE)` does not fit here. I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424136033 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumValueSize(index, currIndex, isAscending); Review Comment: > Not sure why we need to init `cumValueSize`? It seem below we never read the value set here? We must do that. Look at line#106 (`cumValueSize += Math.max(currValueSize, 0);`). In fact, when we have everything ready in the cache (`unpackedTimestampAndValueSizes`), we do not need it. Otherwise, we need the former computed cumValueSize to be able to move to the correct position in the segment and deserialize the next one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
mjsax commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424208309 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumValueSize(index, currIndex, isAscending); Review Comment: line#100 is `unpackedTimestampAndValueSizes.put(currIndex, new TimestampAndValueSize(currTimestamp, cumValueSize));` in this PR. I only see `cumValueSize += Math.max(currValueSize, 0);` in line#112 (but in L112, `currValueSize` was already overwritten in L108) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
mjsax commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424208309 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumValueSize(index, currIndex, isAscending); Review Comment: line#100 is `unpackedTimestampAndValueSizes.put(currIndex, new TimestampAndValueSize(currTimestamp, cumValueSize));` in this PR. I only see `cumValueSize += Math.max(currValueSize, 0);` in line#112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
mjsax commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424206051 ## streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java: ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; + +@RunWith(Enclosed.class) +public class ReadonlyPartiallyDeserializedSegmentValueTest { + +/** + * Non-exceptional scenarios which are expected to occur during regular store operation. + */ +@RunWith(Parameterized.class) +public static class ExpectedCasesTest { + +private static final List TEST_CASES = new ArrayList<>(); + +static { +// test cases are expected to have timestamps in strictly decreasing order (except for the degenerate case) +TEST_CASES.add(new TestCase("single record", 10, new TestRecord("foo".getBytes(), 1))); +TEST_CASES.add(new TestCase("multiple records", 10, new TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new TestRecord("baz".getBytes(), 0))); +TEST_CASES.add(new TestCase("single tombstone", 10, new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("multiple tombstone", 10, new TestRecord(null, 4), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new TestRecord("bar".getBytes(), 1))); +TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new TestRecord(null, 2), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, new TestRecord(null, 7), new TestRecord(null, 6), new TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1))); +TEST_CASES.add(new TestCase("record with empty bytes", 10, new TestRecord(new byte[0], 1))); +TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1))); +TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new TestRecord("foo".getBytes(), 1))); +} + +private final TestCase testCase; + +public ExpectedCasesTest(final TestCase testCase) { +this.testCase = testCase; +} + +@Parameterized.Parameters(name = "{0}") +public static Collection data() { +return TEST_CASES; +} + +@Test +public void shouldFindInTimeRangesWithDifferentOrders() { + +// create a list of timestamps in ascending order to use them in combination for starting and ending point of the time range. +final List timestamps = createTimestampsFromTestRecords(testCase); + +// verify results +final List orders = Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING); +for (final ResultOrder order: orders) { +for (final Long from : timestamps) { +for (final Long
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424138349 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { Review Comment: > Do we need this method? Cannot find where it's used (except test code) The test code is also important. We may have future usage as well. What's the problem with having it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424136033 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumValueSize(index, currIndex, isAscending); Review Comment: > Not sure why we need to init `cumValueSize`? It seem below we never read the value set here? We must do that. Look at line#100 (`cumValueSize += Math.max(currValueSize, 0);`). In fact, when we have everything ready in the cache (`unpackedTimestampAndValueSizes`), we do not need it. Otherwise, we need the former computed cumValueSize to be able to move to the correct position in the segment and deserialize the next one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424138349 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { Review Comment: > Do we need this method? Cannot find where it's used (except test code) The test code is also important. We may future usage as well. What 's the problem of having it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424055579 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -100,34 +130,48 @@ private boolean maybeFillIterator() { final byte[] rawSegmentValue = segment.get(key, snapshot); if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (segment.id() == -1) { // this is the latestValueStore -final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); -if (recordTimestamp <= toTime) { -// latest value satisfies timestamp bound -queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); -} +this.currentRawSegmentValue = rawSegmentValue; } else { -// this segment contains records with the specified key and time range -final List searchResults = - RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); -for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { -queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo())); -} +this.currentDeserializedSegmentValue = new ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue); } -} -if (!queryResults.isEmpty()) { -break; +return true; } } -if (!queryResults.isEmpty()) { -// since data is stored in descending order in the segments, create the list in reverse order, if the order is Ascending. -this.iterator = order.equals(ResultOrder.ASCENDING) ? queryResults.listIterator(queryResults.size()) : queryResults.listIterator(); -return true; -} // if all segments have been processed, release the snapshot releaseSnapshot(); return false; } +private VersionedRecord getNextRecord() { +VersionedRecord nextRecord = null; +if (currentRawSegmentValue != null) { // this is the latestValueStore +final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue); +if (recordTimestamp <= toTime) { +final byte[] value = RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue); +// latest value satisfies timestamp bound +nextRecord = new VersionedRecord<>(value, recordTimestamp); +} +} else { +final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult currentRecord = +currentDeserializedSegmentValue.find(fromTime, toTime, order, nextIndex); +if (currentRecord != null) { +nextRecord = new VersionedRecord<>(currentRecord.value(), currentRecord.validFrom(), currentRecord.validTo()); +this.nextIndex = order.equals(ResultOrder.ASCENDING) ? currentRecord.index() - 1 : currentRecord.index() + 1; +} +} +// no relevant record can be found in the segment +if (currentRawSegmentValue != null || nextRecord == null || !canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), nextRecord.validTo().get())) { Review Comment: > To make sure I understand this correctly: > > * If `nextRecord == null` it means we did not find anything and want to go to the next segment (make sense) > * If `currentRawSegmentValue != null` it does not matter if we filled `nextRecord` or not, because the "latest" segment contains at max one record, and thus we need to step into the next segment in any case? > * Similar for `canSegmentHaveMoreRelevantRecords` we check if we exceeded the current segment? Yes. Exactly. When `canSegmentHaveMoreRelevantRecords` returns false, it means that this segment has no more records that fit in query time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
mjsax commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1423917600 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -100,34 +130,48 @@ private boolean maybeFillIterator() { final byte[] rawSegmentValue = segment.get(key, snapshot); if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (segment.id() == -1) { // this is the latestValueStore -final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); -if (recordTimestamp <= toTime) { -// latest value satisfies timestamp bound -queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); -} +this.currentRawSegmentValue = rawSegmentValue; } else { -// this segment contains records with the specified key and time range -final List searchResults = - RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); -for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { -queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo())); -} +this.currentDeserializedSegmentValue = new ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue); } -} -if (!queryResults.isEmpty()) { -break; +return true; } } -if (!queryResults.isEmpty()) { -// since data is stored in descending order in the segments, create the list in reverse order, if the order is Ascending. -this.iterator = order.equals(ResultOrder.ASCENDING) ? queryResults.listIterator(queryResults.size()) : queryResults.listIterator(); -return true; -} // if all segments have been processed, release the snapshot releaseSnapshot(); return false; } +private VersionedRecord getNextRecord() { +VersionedRecord nextRecord = null; +if (currentRawSegmentValue != null) { // this is the latestValueStore +final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue); +if (recordTimestamp <= toTime) { +final byte[] value = RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue); +// latest value satisfies timestamp bound +nextRecord = new VersionedRecord<>(value, recordTimestamp); +} +} else { +final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult currentRecord = +currentDeserializedSegmentValue.find(fromTime, toTime, order, nextIndex); +if (currentRecord != null) { +nextRecord = new VersionedRecord<>(currentRecord.value(), currentRecord.validFrom(), currentRecord.validTo()); +this.nextIndex = order.equals(ResultOrder.ASCENDING) ? currentRecord.index() - 1 : currentRecord.index() + 1; +} +} +// no relevant record can be found in the segment +if (currentRawSegmentValue != null || nextRecord == null || !canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), nextRecord.validTo().get())) { +prepareToFetchNextSegment(); +} +return nextRecord; +} + +private boolean canSegmentHaveMoreRelevantRecords(final long currentValidFrom, final long currentValidTo) { +final boolean isCurrentOutsideTimeRange = (order.equals(ResultOrder.ASCENDING) && (currentValidTo > toTime || currentDeserializedSegmentValue.nextTimestamp() == currentValidTo)) Review Comment: This is complex. Might be simpler to use: ``` final boolean isCurrentOutsideTimeRange; if (order.equals(ResultOrder.ASCENDING) { isCurrentOutsideTimeRange = currentValidTo > toTime || currentDeserializedSegmentValue.nextTimestamp() == currentValidTo); } else { isCurrentOutsideTimeRange = currentValidFrom < fromTime || currentDeserializedSegmentValue.minTimestamp() == currentValidFrom); } ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + *
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1422488101 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long getMinTimestamp() { +return minTimestamp; +} + +public long getNextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumvalueSize(index, currIndex, isAscending); +int currValueSize; + + +while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { +if (hasBeenDeserialized(isAscending, currIndex)) { +final TimestampAndValueSize curr; +curr = unpackedTimestampAndValueSizes.get(currIndex); +currTimestamp = curr.timestamp; +cumValueSize = cumulativeValueSizes.get(currIndex); +currValueSize = curr.valueSize; + +// update currValueSize +if (currValueSize == Integer.MIN_VALUE) { +final int timestampSegmentIndex = getTimestampIndex(order, currIndex); +currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); +unpackedTimestampAndValueSizes.put(currIndex, new TimestampAndValueSize(currTimestamp, cumValueSize)); +} + +currNextTimestamp = updateCurrNextTimestamp(currIndex, isAscending); + +} else { +final int timestampSegmentIndex = getTimestampIndex(order, currIndex); +currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex); +currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); +currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE +? nextTimestamp // if this is the first record metadata (timestamp + value size) +:
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
mjsax commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1422442823 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long getMinTimestamp() { +return minTimestamp; +} + +public long getNextTimestamp() { Review Comment: same, no `get` ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long getMinTimestamp() { Review Comment: nit: naming convention, don't use `get` -> `minTimestamp()` ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,206 @@ +/* + *
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1421149432 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, final boolean includeValue } @Override -public List findAll(final long fromTime, final long toTime) { -long currNextTimestamp = nextTimestamp; -final List segmentSearchResults = new ArrayList<>(); -long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked -int currValueSize; -int currIndex = 0; -int cumValueSize = 0; -while (currTimestamp != minTimestamp) { -final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +public SegmentSearchResult find(final long fromTime, final long toTime, final ResultOrder order) { +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} +long currTimestamp = -1; +long currNextTimestamp = -1; + + +if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == -1) { +findValuesStartingIndex(); +} + +while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { +final int timestampSegmentIndex = getTimestampIndex(order, currentDeserIndex); currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex); -currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); -cumValueSize += Math.max(currValueSize, 0); +currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata (timestamp + value size) + : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - (TIMESTAMP_SIZE + VALUE_SIZE)); +final int currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); if (currValueSize >= 0) { final byte[] value = new byte[currValueSize]; -final int valueSegmentIndex = segmentValue.length - cumValueSize; +final int valueSegmentIndex = getValueSegmentIndex(order, currentCumValueSize, currValueSize); System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize); if (currTimestamp <= toTime && currNextTimestamp > fromTime) { -segmentSearchResults.add(new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value)); +currentCumValueSize += currValueSize; +currentDeserIndex++; +return new SegmentSearchResult(currentDeserIndex - 1, currTimestamp, currNextTimestamp, value); } } - // prep for next iteration -currNextTimestamp = currTimestamp; +currentCumValueSize += Math.max(currValueSize, 0); +currentDeserIndex++; +} +// search in segment expected to find result but did not +return null; +} + +private boolean hasStillRecord(final long currTimestamp, final long currNextTimestamp, final ResultOrder order) { +return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != nextTimestamp : currTimestamp != minTimestamp; +} + +private int getValueSegmentIndex(final ResultOrder order, final int currentCumValueSize, final int currValueSize) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + currentCumValueSize + : segmentValue.length - (currentCumValueSize + currValueSize); +} + +private int getTimestampIndex(final ResultOrder order, final int currIndex) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - ((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE)) + : 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +} + +private void findValuesStartingIndex() { +long currTimestamp = -1; +int currIndex = 0; +int timestampSegmentIndex = 0; +while (currTimestamp != minTimestamp) { +timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +currTimestamp =
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on PR #14957: URL: https://github.com/apache/kafka/pull/14957#issuecomment-1848005381 @mjsax I modified the `find()` method in the best possible way reusing the currently defined `deserIndex` and caches. Here is a brief list of changes and issues. 1. I had to define the `reversedDeserIndex` as well as two other caches for deserialisation in ascending order of timestamps 2. I had to add a new parameter `index` to the `find()` method since we are iterating over the segment and just relying on `deserIndex` is not safe since it may be modified by the other `find()` or be rest with methods such as `initializeWithRecord()` 3. Look at [this](https://github.com/aliehsaeedii/kafka/blob/350167e2693db44e5b0357ee8527273c2452eb02/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java#L387) comment please:| I must confess that there is no way to fix this issue. Conclusion: 1. I believe that a single instance of `PartiallyDeserializedSegmentValue` is not able to call the two `find()` methods (or other class methods) since my `find()` is only accessible through the `LogicalSegmentIterator` class. Of course, we can call them interleaved in `RocksDBVersionedStoreSegmentValueFormatterTest` class. But it is a fake manual thing. Please tell me if I am wrong. If this conclusion is correct, defining the new `find()` method in `PartiallyDeserializedSegmentValue` class is meaningless. Since it anyway is not able to reuse the common deserHelpers. We must move it to the `SegmentIterator` class which makes the implementation cleaner and easier. 2. If my former conclusion is not correct, then either the new `find()` must have its own isolated `deserHelpers` (like my former implementation) or find a safe way to reuse the currently available ones (which I do not know yet). P.S.: This is not a final commit. Committed just for more clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420974212 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, final boolean includeValue } @Override -public List findAll(final long fromTime, final long toTime) { -long currNextTimestamp = nextTimestamp; -final List segmentSearchResults = new ArrayList<>(); -long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked -int currValueSize; -int currIndex = 0; -int cumValueSize = 0; -while (currTimestamp != minTimestamp) { -final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +public SegmentSearchResult find(final long fromTime, final long toTime, final ResultOrder order) { +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} +long currTimestamp = -1; +long currNextTimestamp = -1; + + +if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == -1) { +findValuesStartingIndex(); +} + +while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { +final int timestampSegmentIndex = getTimestampIndex(order, currentDeserIndex); currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex); -currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); -cumValueSize += Math.max(currValueSize, 0); +currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata (timestamp + value size) + : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - (TIMESTAMP_SIZE + VALUE_SIZE)); +final int currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); if (currValueSize >= 0) { final byte[] value = new byte[currValueSize]; -final int valueSegmentIndex = segmentValue.length - cumValueSize; +final int valueSegmentIndex = getValueSegmentIndex(order, currentCumValueSize, currValueSize); System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize); if (currTimestamp <= toTime && currNextTimestamp > fromTime) { -segmentSearchResults.add(new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value)); +currentCumValueSize += currValueSize; +currentDeserIndex++; +return new SegmentSearchResult(currentDeserIndex - 1, currTimestamp, currNextTimestamp, value); } } - // prep for next iteration -currNextTimestamp = currTimestamp; +currentCumValueSize += Math.max(currValueSize, 0); +currentDeserIndex++; +} +// search in segment expected to find result but did not +return null; +} + +private boolean hasStillRecord(final long currTimestamp, final long currNextTimestamp, final ResultOrder order) { +return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != nextTimestamp : currTimestamp != minTimestamp; +} + +private int getValueSegmentIndex(final ResultOrder order, final int currentCumValueSize, final int currValueSize) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + currentCumValueSize + : segmentValue.length - (currentCumValueSize + currValueSize); +} + +private int getTimestampIndex(final ResultOrder order, final int currIndex) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - ((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE)) + : 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +} + +private void findValuesStartingIndex() { +long currTimestamp = -1; +int currIndex = 0; +int timestampSegmentIndex = 0; +while (currTimestamp != minTimestamp) { +timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +currTimestamp =
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420648455 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, final boolean includeValue } @Override -public List findAll(final long fromTime, final long toTime) { -long currNextTimestamp = nextTimestamp; -final List segmentSearchResults = new ArrayList<>(); -long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked -int currValueSize; -int currIndex = 0; -int cumValueSize = 0; -while (currTimestamp != minTimestamp) { -final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +public SegmentSearchResult find(final long fromTime, final long toTime, final ResultOrder order) { +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} +long currTimestamp = -1; +long currNextTimestamp = -1; + + +if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == -1) { +findValuesStartingIndex(); +} + +while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { +final int timestampSegmentIndex = getTimestampIndex(order, currentDeserIndex); currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex); -currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); -cumValueSize += Math.max(currValueSize, 0); +currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata (timestamp + value size) + : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - (TIMESTAMP_SIZE + VALUE_SIZE)); +final int currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); if (currValueSize >= 0) { final byte[] value = new byte[currValueSize]; -final int valueSegmentIndex = segmentValue.length - cumValueSize; +final int valueSegmentIndex = getValueSegmentIndex(order, currentCumValueSize, currValueSize); System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize); if (currTimestamp <= toTime && currNextTimestamp > fromTime) { -segmentSearchResults.add(new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value)); +currentCumValueSize += currValueSize; +currentDeserIndex++; +return new SegmentSearchResult(currentDeserIndex - 1, currTimestamp, currNextTimestamp, value); } } - // prep for next iteration -currNextTimestamp = currTimestamp; +currentCumValueSize += Math.max(currValueSize, 0); +currentDeserIndex++; +} +// search in segment expected to find result but did not +return null; +} + +private boolean hasStillRecord(final long currTimestamp, final long currNextTimestamp, final ResultOrder order) { +return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != nextTimestamp : currTimestamp != minTimestamp; +} + +private int getValueSegmentIndex(final ResultOrder order, final int currentCumValueSize, final int currValueSize) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + currentCumValueSize + : segmentValue.length - (currentCumValueSize + currValueSize); +} + +private int getTimestampIndex(final ResultOrder order, final int currIndex) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - ((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE)) + : 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +} + +private void findValuesStartingIndex() { +long currTimestamp = -1; +int currIndex = 0; +int timestampSegmentIndex = 0; +while (currTimestamp != minTimestamp) { +timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +currTimestamp =
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420641426 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, final boolean includeValue } @Override -public List findAll(final long fromTime, final long toTime) { -long currNextTimestamp = nextTimestamp; -final List segmentSearchResults = new ArrayList<>(); -long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked -int currValueSize; -int currIndex = 0; -int cumValueSize = 0; -while (currTimestamp != minTimestamp) { -final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +public SegmentSearchResult find(final long fromTime, final long toTime, final ResultOrder order) { +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} +long currTimestamp = -1; +long currNextTimestamp = -1; + + +if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == -1) { +findValuesStartingIndex(); +} + +while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { +final int timestampSegmentIndex = getTimestampIndex(order, currentDeserIndex); currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex); -currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); -cumValueSize += Math.max(currValueSize, 0); +currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata (timestamp + value size) + : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - (TIMESTAMP_SIZE + VALUE_SIZE)); +final int currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); if (currValueSize >= 0) { final byte[] value = new byte[currValueSize]; -final int valueSegmentIndex = segmentValue.length - cumValueSize; +final int valueSegmentIndex = getValueSegmentIndex(order, currentCumValueSize, currValueSize); System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize); if (currTimestamp <= toTime && currNextTimestamp > fromTime) { -segmentSearchResults.add(new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value)); +currentCumValueSize += currValueSize; +currentDeserIndex++; +return new SegmentSearchResult(currentDeserIndex - 1, currTimestamp, currNextTimestamp, value); } } - // prep for next iteration -currNextTimestamp = currTimestamp; +currentCumValueSize += Math.max(currValueSize, 0); +currentDeserIndex++; +} +// search in segment expected to find result but did not +return null; +} + +private boolean hasStillRecord(final long currTimestamp, final long currNextTimestamp, final ResultOrder order) { +return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != nextTimestamp : currTimestamp != minTimestamp; +} + +private int getValueSegmentIndex(final ResultOrder order, final int currentCumValueSize, final int currValueSize) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + currentCumValueSize + : segmentValue.length - (currentCumValueSize + currValueSize); +} + +private int getTimestampIndex(final ResultOrder order, final int currIndex) { +return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - ((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE)) + : 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +} + +private void findValuesStartingIndex() { +long currTimestamp = -1; +int currIndex = 0; +int timestampSegmentIndex = 0; +while (currTimestamp != minTimestamp) { +timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); +currTimestamp =
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420471301 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -150,7 +152,15 @@ interface SegmentValue { */ SegmentSearchResult find(long timestamp, boolean includeValue); -List findAll(long fromTime, long toTime); +/** + * Finds the next/previous record in this segment row that is valid within the query specified time range + * + * @param fromTime the query starting time point + * @param toTime the query ending time point + * @param order specifies the order (based on timestamp) in which records are returned + * @return the record that is found, null if no record is found + */ +SegmentSearchResult find(long fromTime, long toTime, ResultOrder order); Review Comment: > I am wondering if existing `find()` and the new `find()` can be called interleaved without breaking anything? When does that happen? Right now, only instances of `LogicalSegmentIterator` have access to the new `find` method; therefore, I think the same instance of `PartiallyDeserializedSetmentValue` cannot call them interleaved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420466908 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -150,7 +152,15 @@ interface SegmentValue { */ SegmentSearchResult find(long timestamp, boolean includeValue); -List findAll(long fromTime, long toTime); +/** + * Finds the next/previous record in this segment row that is valid within the query specified time range + * + * @param fromTime the query starting time point + * @param toTime the query ending time point + * @param order specifies the order (based on timestamp) in which records are returned + * @return the record that is found, null if no record is found + */ +SegmentSearchResult find(long fromTime, long toTime, ResultOrder order); Review Comment: > I am wondering if existing `find()` and the new `find()` can be called interleaved without breaking anything? > > So far, `PartiallyDeserializedSetmentValue` is already lazy and deserializes from newest to oldest. It would be a difficult API contract if calling existing `find` and new `find` interleaved would break anything? A very good question. I think we need a sync session about that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420462098 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue implements SegmentValue { private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) private List unpackedReversedTimestampAndValueSizes; private List cumulativeValueSizes; // ordered same as timestamp and value sizes (reverse time-sorted) +private int valuesStartingIndex = -1; // the index of first value in the segment +private int currentCumValueSize = 0; // this is for deserializing a segment records in a lazy manner +private int currentDeserIndex = 0; // this is for deserializing a segment records in a lazy manner Review Comment: > Why do we need to duplicate both variables? This is what I did in the first place. But it is complicated since other methods, such as `doInsert()` and other methods, are manipulating them (I mean `deserIndex` and `cumulativeValueSizes`) as well. That's why I decided to have my own variables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420464501 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue implements SegmentValue { private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) private List unpackedReversedTimestampAndValueSizes; private List cumulativeValueSizes; // ordered same as timestamp and value sizes (reverse time-sorted) +private int valuesStartingIndex = -1; // the index of first value in the segment +private int currentCumValueSize = 0; // this is for deserializing a segment records in a lazy manner +private int currentDeserIndex = 0; // this is for deserializing a segment records in a lazy manner Review Comment: > it seems we might want to change their name to indicate when (ascending vs descending) they are used? I use the same variable (my own defined one) for both directions. Since deserialization is either backwards or forward and not both at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420462098 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue implements SegmentValue { private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) private List unpackedReversedTimestampAndValueSizes; private List cumulativeValueSizes; // ordered same as timestamp and value sizes (reverse time-sorted) +private int valuesStartingIndex = -1; // the index of first value in the segment +private int currentCumValueSize = 0; // this is for deserializing a segment records in a lazy manner +private int currentDeserIndex = 0; // this is for deserializing a segment records in a lazy manner Review Comment: > Why do we need to duplicate both variables? This is what I did in the first place. But it is complicated since other methods, such as `doInsert()` and other methods, are manipulating (`deserIndex` and `cumulativeValueSizes`) them as well. That's why I decided to have my own variables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420452896 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -100,34 +134,49 @@ private boolean maybeFillIterator() { final byte[] rawSegmentValue = segment.get(key, snapshot); if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (segment.id() == -1) { // this is the latestValueStore -final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); -if (recordTimestamp <= toTime) { -// latest value satisfies timestamp bound -queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); -} +this.currentRawSegmentValue = rawSegmentValue; } else { -// this segment contains records with the specified key and time range -final List searchResults = - RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); -for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { -queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo())); -} +this.currentDeserializedSegmentValue = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue); +this.minTimestamp = RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue); +this.nextTimestamp = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); } -} -if (!queryResults.isEmpty()) { -break; +return true; } } -if (!queryResults.isEmpty()) { -// since data is stored in descending order in the segments, create the list in reverse order, if the order is Ascending. -this.iterator = order.equals(ResultOrder.ASCENDING) ? queryResults.listIterator(queryResults.size()) : queryResults.listIterator(); -return true; -} // if all segments have been processed, release the snapshot releaseSnapshot(); return false; } +private Object getNextRecord() { +VersionedRecord nextRecord = null; +if (currentRawSegmentValue != null) { // this is the latestValueStore +final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue); +if (recordTimestamp <= toTime) { +final byte[] value = RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue); +// latest value satisfies timestamp bound +nextRecord = new VersionedRecord<>(value, recordTimestamp); +} +} else { +final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult currentRecord = +currentDeserializedSegmentValue.find(fromTime, toTime, order); +if (currentRecord != null) { +nextRecord = new VersionedRecord<>(currentRecord.value(), currentRecord.validFrom(), currentRecord.validTo()); +} +} +// no relevant record can be found in the segment +if (currentRawSegmentValue != null || nextRecord == null || !canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), Long.parseLong(nextRecord.validTo().get().toString( { Review Comment: > `Long.parseLong(nextRecord.validTo().get().toString())` -- why to we get valid-to as `String` and pipe though `parseLong`? Why can't we just pass `nextRecord.validTo().get()` into `canSegmentHaveMoreRelevantRecords(...)`? I had to do that. Fixing generic type, fixed all these issues now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
mjsax commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420058690 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -69,23 +76,50 @@ public boolean hasNext() { if (!open) { throw new IllegalStateException("The iterator is out of scope."); } -// since data is stored in descending order in the segments, check whether there is any previous record, if the order is Ascending. -final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); -return hasStillLoad || maybeFillIterator(); +if (this.next != null) { +return true; +} + +while ((currentDeserializedSegmentValue != null || currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == null) { +boolean hasSegmentValue = currentDeserializedSegmentValue != null || currentRawSegmentValue != null; +if (!hasSegmentValue) { +hasSegmentValue = maybeFillCurrentSegmentValue(); +} +if (hasSegmentValue) { +this.next = (VersionedRecord) getNextRecord(); +if (this.next == null) { +prepareToFetchNextSegment(); +} +} +} +return this.next != null; } +@SuppressWarnings("unchecked") Review Comment: We can avoid this suppression by fixing generic types. This class should be defined as ``` public class LogicalSegmentIterator implements VersionedRecordIterator { ``` (ie, add the return type `byte[]`) This allows us to change the return type of `next()`: ``` public VersionedRecord next() { ``` what make the suppression unnecessary and we can also use fully types `VersionedRecord` throughput the code base of this class (instead of untype `VersionRecord`, and we can avoid casts) ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -69,23 +76,50 @@ public boolean hasNext() { if (!open) { throw new IllegalStateException("The iterator is out of scope."); } -// since data is stored in descending order in the segments, check whether there is any previous record, if the order is Ascending. -final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); -return hasStillLoad || maybeFillIterator(); +if (this.next != null) { +return true; +} + +while ((currentDeserializedSegmentValue != null || currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == null) { +boolean hasSegmentValue = currentDeserializedSegmentValue != null || currentRawSegmentValue != null; +if (!hasSegmentValue) { +hasSegmentValue = maybeFillCurrentSegmentValue(); +} +if (hasSegmentValue) { +this.next = (VersionedRecord) getNextRecord(); Review Comment: Remove cast (should not be necessary... cf other comments) ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -100,34 +134,49 @@ private boolean maybeFillIterator() { final byte[] rawSegmentValue = segment.get(key, snapshot); if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (segment.id() == -1) { // this is the latestValueStore -final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); -if (recordTimestamp <= toTime) { -// latest value satisfies timestamp bound -queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); -} +this.currentRawSegmentValue = rawSegmentValue; } else { -// this segment contains records with the specified key and time range -final List searchResults = - RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); -for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { -queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo())); -} +this.currentDeserializedSegmentValue = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue); +this.minTimestamp =
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
mjsax commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420051909 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -34,7 +31,16 @@ public class LogicalSegmentIterator implements VersionedRecordIterator { private final Long fromTime; private final Long toTime; private final ResultOrder order; -private ListIterator> iterator; +// stores the raw value of the latestValueStore when latestValueStore is the current segment +private byte[] currentRawSegmentValue; +// stores the deserialized value of the current segment (when current segment is one of the old segments) +private RocksDBVersionedStoreSegmentValueFormatter.SegmentValue currentDeserializedSegmentValue; +// current segment minTimestamp (when current segment is not the latestValueStore) +private long minTimestamp; +// current segment nextTimestamp (when current segment is not the latestValueStore) +private long nextTimestamp; +private VersionedRecord next; +// current deserialization index Review Comment: Wrong line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org