mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424206051


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@RunWith(Enclosed.class)
+public class ReadonlyPartiallyDeserializedSegmentValueTest {
+
+    /**
+     * Non-exceptional scenarios which are expected to occur during regular 
store operation.
+     */
+    @RunWith(Parameterized.class)
+    public static class ExpectedCasesTest {
+
+        private static final List<TestCase> TEST_CASES = new ArrayList<>();
+
+        static {
+            // test cases are expected to have timestamps in strictly 
decreasing order (except for the degenerate case)
+            TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+            TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 
10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 
10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 
10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), 
new TestRecord(null, 2), new TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 
10, new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+            TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, 
new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+            TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 
10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+        }
+
+        private final TestCase testCase;
+
+        public ExpectedCasesTest(final TestCase testCase) {
+            this.testCase = testCase;
+        }
+
+        @Parameterized.Parameters(name = "{0}")
+        public static Collection<TestCase> data() {
+            return TEST_CASES;
+        }
+
+        @Test
+        public void shouldFindInTimeRangesWithDifferentOrders() {
+
+            // create a list of timestamps in ascending order to use them in 
combination for starting and ending point of the time range.
+            final List<Long> timestamps = 
createTimestampsFromTestRecords(testCase);
+
+            // verify results
+            final List<ResultOrder> orders = 
Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING);
+            for (final ResultOrder order: orders) {
+                for (final Long from : timestamps) {
+                    for (final Long to : timestamps) {
+                        // build expected results indices based on time range
+                        final List<Integer> expectedRecordsIndices = new 
ArrayList<>();
+                        for (int i = 0; i < testCase.records.size(); i++) {
+                            final long recordValidTo = i == 0 ? 
testCase.nextTimestamp : testCase.records.get(i - 1).timestamp;
+                            if (testCase.records.get(i).timestamp <= to && 
recordValidTo > from) {
+                                if (testCase.records.get(i).value != null) { 
// the results do not include tombstones
+                                    expectedRecordsIndices.add(i);
+                                }
+                            }
+                        }
+                        // The only object that has access to the find method 
is the instance of LogicalSegmentIterator.
+                        // Therefore, closing the iterator means that the 
segment calling the find method is destroyed and needs
+                        // to be created for the next time range.
+                        final ReadonlyPartiallyDeserializedSegmentValue 
segmentValue = buildSegmentWithInsertLatest(testCase);
+                        if (order.equals(ResultOrder.ASCENDING)) {
+                            Collections.reverse(expectedRecordsIndices);
+                        }
+                        int segmentRecordIndex = -1;
+                        for (final int index : expectedRecordsIndices) {
+                            final long expectedValidTo = index == 0 ? 
testCase.nextTimestamp : testCase.records.get(index - 1).timestamp;
+
+                            final SegmentSearchResult result = 
segmentValue.find(from, to, order, segmentRecordIndex);
+                            assert result != null;
+
+                            final TestRecord expectedRecord = 
testCase.records.get(index);
+
+                            assertThat(result.index(), equalTo(index));
+                            assertThat(result.value(), 
equalTo(expectedRecord.value));
+                            assertThat(result.validFrom(), 
equalTo(expectedRecord.timestamp));
+                            assertThat(result.validTo(), 
equalTo(expectedValidTo));
+
+                            segmentRecordIndex = 
order.equals(ResultOrder.ASCENDING) ? index - 1 : index + 1;
+                        }
+                        // verify no results within the time range
+                        if (expectedRecordsIndices.size() == 0) {
+                            assertNull(segmentValue.find(from, to, order, -1));
+                        }
+                    }
+                }
+            }
+        }
+
+        @Test
+        public void shouldGetTimestamps() {

Review Comment:
   Seems we don't need this test? It just replicates an existing test, as it 
tests `RocksDBVersionedStoreSegmentValueFormatter`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;
+    private long nextTimestamp;
+    private long minTimestamp;
+
+    private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+    private Map<Integer, Integer> cumulativeValueSizes;
+
+    private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+    private Map<Integer, TimestampAndValueSize> unpackedTimestampAndValueSizes 
= new HashMap<>();
+    private int recordNumber = -1; // number of segment records
+
+
+    ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+        this.segmentValue = segmentValue;
+        this.nextTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+        this.minTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+        resetDeserHelpers();
+    }
+
+
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    public long nextTimestamp() {
+        return nextTimestamp;
+    }
+
+    public byte[] serialize() {

Review Comment:
   Yes tests are important, but the test you added is not testing this method, 
but only adds a redundent test for `getTimestamp()`? Seem we don't need this 
newly added test a second time?
   
   > We may have future usage as well.
   We would add it when we need it :) 
   
   > What's the problem with having it?
   It's about "cleaness" -- we also don't add a method that computes the 
Fibonacci sequence, because we might need it in the future :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to