agavra commented on code in PR #18610:
URL: https://github.com/apache/kafka/pull/18610#discussion_r1960767233


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
         }
     }
 
-    private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
+    /**
+     * A range-based iterator for RocksDB that merges results from two column 
families.
+     *
+     * <p>This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+     * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+     * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+     * boundaries.</p>
+     *
+     * <h2>Key Features</h2>
+     *
+     * <ul>
+     *     <li>Merges results from the "with-timestamp" and "no-timestamp" 
column families.</li>
+     *     <li>Supports range-based queries with open or closed 
boundaries.</li>
+     *     <li>Handles both forward and reverse iteration seamlessly.</li>
+     *     <li>Ensures correct handling of inclusive and exclusive upper 
boundaries.</li>
+     *     <li>Integrates efficiently with Kafka Streams state store 
mechanisms.</li>
+     * </ul>
+     *
+     * <h2>Usage</h2>
+     *
+     * <p>The iterator can be used for different types of range-based 
operations, such as:
+     * <ul>
+     *     <li>Iterating over all keys within a range.</li>
+     *     <li>Prefix-based scans (when combined with dynamically calculated 
range endpoints).</li>
+     *     <li>Open-ended range queries (e.g., from a given key to the end of 
the dataset).</li>
+     * </ul>
+     * </p>
+     *
+     * <h2>Implementation Details</h2>
+     *
+     * <p>The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+     * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+     * are merged during iteration, ensuring proper order and de-duplication 
where applicable.</p>
+     *
+     * <h3>Key Methods:</h3>
+     *
+     * <ul>
+     *     <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in 
the merged range, ensuring
+     *     the result is within the specified range and boundary 
conditions.</li>
+     *     <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB 
iterators based on the specified range and direction.</li>
+     *     <li><b>{@code isInRange()}:</b> Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.</li>
+     *     <li><b>{@code fetchNextKeyValue()}:</b> Determines the next 
key-value pair to return based on the state of both iterators.</li>
+     * </ul>
+     *
+     * <h3>Thread Safety:</h3>
+     *
+     * <p>The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+     * threads without external synchronization.</p>

Review Comment:
   what does this mean? doesn't that just mean it's not thread safe?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
         }
     }
 
-    private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
+    /**
+     * A range-based iterator for RocksDB that merges results from two column 
families.
+     *
+     * <p>This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+     * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+     * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+     * boundaries.</p>
+     *
+     * <h2>Key Features</h2>
+     *
+     * <ul>
+     *     <li>Merges results from the "with-timestamp" and "no-timestamp" 
column families.</li>
+     *     <li>Supports range-based queries with open or closed 
boundaries.</li>
+     *     <li>Handles both forward and reverse iteration seamlessly.</li>
+     *     <li>Ensures correct handling of inclusive and exclusive upper 
boundaries.</li>
+     *     <li>Integrates efficiently with Kafka Streams state store 
mechanisms.</li>
+     * </ul>
+     *
+     * <h2>Usage</h2>
+     *
+     * <p>The iterator can be used for different types of range-based 
operations, such as:
+     * <ul>
+     *     <li>Iterating over all keys within a range.</li>
+     *     <li>Prefix-based scans (when combined with dynamically calculated 
range endpoints).</li>
+     *     <li>Open-ended range queries (e.g., from a given key to the end of 
the dataset).</li>
+     * </ul>
+     * </p>
+     *
+     * <h2>Implementation Details</h2>
+     *
+     * <p>The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+     * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+     * are merged during iteration, ensuring proper order and de-duplication 
where applicable.</p>
+     *
+     * <h3>Key Methods:</h3>
+     *
+     * <ul>
+     *     <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in 
the merged range, ensuring
+     *     the result is within the specified range and boundary 
conditions.</li>
+     *     <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB 
iterators based on the specified range and direction.</li>
+     *     <li><b>{@code isInRange()}:</b> Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.</li>
+     *     <li><b>{@code fetchNextKeyValue()}:</b> Determines the next 
key-value pair to return based on the state of both iterators.</li>
+     * </ul>
+     *
+     * <h3>Thread Safety:</h3>
+     *
+     * <p>The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+     * threads without external synchronization.</p>
+     *
+     * <h2>Examples</h2>
+     *
+     * <h3>Iterate over a range:</h3>
+     *
+     * <pre>{@code
+     * RocksIterator noTimestampIterator = 
accessor.newIterator(noTimestampColumnFamily);
+     * RocksIterator withTimestampIterator = 
accessor.newIterator(withTimestampColumnFamily);
+     *
+     * try (RocksDBDualCFRangeIterator iterator = new 
RocksDBDualCFRangeIterator(
+     *         new Bytes("keyStart".getBytes()),
+     *         new Bytes("keyEnd".getBytes()),
+     *         noTimestampIterator,
+     *         withTimestampIterator,
+     *         "storeName",
+     *         true,  // Forward iteration
+     *         true   // Inclusive upper boundary
+     * )) {
+     *     while (iterator.hasNext()) {
+     *         KeyValue<Bytes, byte[]> entry = iterator.next();
+     *         System.out.println("Key: " + entry.key + ", Value: " + 
Arrays.toString(entry.value));
+     *     }
+     * }
+     * }</pre>
+     *
+     * <h2>Exceptions</h2>
+     *
+     * <ul>
+     *     <li><b>{@link InvalidStateStoreException}:</b> Thrown if the 
iterator is accessed after being closed.</li>
+     *     <li><b>{@link IllegalStateException}:</b> Thrown if the close 
callback is not properly set before usage.</li>
+     * </ul>
+     *
+     * @see AbstractIterator
+     * @see ManagedKeyValueIterator
+     * @see RocksDBStore
+     */
+    private static class RocksDBDualCFRangeIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>> implements 
ManagedKeyValueIterator<Bytes, byte[]> {
+        private Runnable closeCallback;
+        private byte[] noTimestampNext;
+        private byte[] withTimestampNext;
         private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
+        private final RocksIterator noTimestampIterator;
+        private final RocksIterator withTimestampIterator;
         private final String storeName;
-        private final RocksIterator iterWithTimestamp;
-        private final RocksIterator iterNoTimestamp;
         private final boolean forward;
-
+        private final boolean toInclusive;
+        private final byte[] rawLastKey;
         private volatile boolean open = true;
 
-        private byte[] nextWithTimestamp;
-        private byte[] nextNoTimestamp;
-        private KeyValue<Bytes, byte[]> next;
-        private Runnable closeCallback = null;
-
-        RocksDBDualCFIterator(final String storeName,
-                              final RocksIterator iterWithTimestamp,
-                              final RocksIterator iterNoTimestamp,
-                              final boolean forward) {
-            this.iterWithTimestamp = iterWithTimestamp;
-            this.iterNoTimestamp = iterNoTimestamp;
-            this.storeName = storeName;
+        /**
+         * Constructs a new {@code RocksDBDualCFRangeIterator}.
+         *
+         * <p>Initializes the RocksDB iterators for two column families 
(timestamped and non-timestamped) and sets up
+         * the range and direction for iteration.</p>
+         *
+         * @param from                  The starting key of the range. Can be 
{@code null} for an open range.
+         * @param to                    The ending key of the range. Can be 
{@code null} for an open range.
+         * @param noTimestampIterator   The iterator for the non-timestamped 
column family.
+         * @param withTimestampIterator The iterator for the timestamped 
column family.
+         * @param storeName             The name of the store associated with 
this iterator.
+         * @param forward               {@code true} for forward iteration; 
{@code false} for reverse iteration.
+         * @param toInclusive           Whether the upper boundary of the 
range is inclusive.
+         */
+        RocksDBDualCFRangeIterator(final Bytes from,
+                                   final Bytes to,
+                                   final RocksIterator noTimestampIterator,
+                                   final RocksIterator withTimestampIterator,
+                                   final String storeName,
+                                   final boolean forward,
+                                   final boolean toInclusive) {
             this.forward = forward;
+            this.noTimestampIterator = noTimestampIterator;
+            this.storeName = storeName;
+            this.toInclusive = toInclusive;
+            this.withTimestampIterator = withTimestampIterator;
+
+            this.rawLastKey = initializeIterators(from, to);

Review Comment:
   generally it's better for constructors not to have side effects - otherwise 
it's possible that if something throws an exception during instantiation the 
`close` method won't be called.
   
   this doesn't seem to be a regression (the old code did that as well for the 
RangeIterator) so consider this comment an optional suggestion



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
         }
     }
 
-    private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
+    /**
+     * A range-based iterator for RocksDB that merges results from two column 
families.
+     *
+     * <p>This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+     * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+     * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+     * boundaries.</p>
+     *
+     * <h2>Key Features</h2>
+     *
+     * <ul>
+     *     <li>Merges results from the "with-timestamp" and "no-timestamp" 
column families.</li>
+     *     <li>Supports range-based queries with open or closed 
boundaries.</li>
+     *     <li>Handles both forward and reverse iteration seamlessly.</li>
+     *     <li>Ensures correct handling of inclusive and exclusive upper 
boundaries.</li>
+     *     <li>Integrates efficiently with Kafka Streams state store 
mechanisms.</li>
+     * </ul>
+     *
+     * <h2>Usage</h2>
+     *
+     * <p>The iterator can be used for different types of range-based 
operations, such as:
+     * <ul>
+     *     <li>Iterating over all keys within a range.</li>
+     *     <li>Prefix-based scans (when combined with dynamically calculated 
range endpoints).</li>
+     *     <li>Open-ended range queries (e.g., from a given key to the end of 
the dataset).</li>
+     * </ul>
+     * </p>
+     *
+     * <h2>Implementation Details</h2>
+     *
+     * <p>The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+     * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+     * are merged during iteration, ensuring proper order and de-duplication 
where applicable.</p>
+     *
+     * <h3>Key Methods:</h3>
+     *
+     * <ul>
+     *     <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in 
the merged range, ensuring
+     *     the result is within the specified range and boundary 
conditions.</li>
+     *     <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB 
iterators based on the specified range and direction.</li>
+     *     <li><b>{@code isInRange()}:</b> Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.</li>
+     *     <li><b>{@code fetchNextKeyValue()}:</b> Determines the next 
key-value pair to return based on the state of both iterators.</li>
+     * </ul>
+     *
+     * <h3>Thread Safety:</h3>
+     *
+     * <p>The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+     * threads without external synchronization.</p>
+     *
+     * <h2>Examples</h2>
+     *
+     * <h3>Iterate over a range:</h3>
+     *
+     * <pre>{@code
+     * RocksIterator noTimestampIterator = 
accessor.newIterator(noTimestampColumnFamily);
+     * RocksIterator withTimestampIterator = 
accessor.newIterator(withTimestampColumnFamily);
+     *
+     * try (RocksDBDualCFRangeIterator iterator = new 
RocksDBDualCFRangeIterator(
+     *         new Bytes("keyStart".getBytes()),
+     *         new Bytes("keyEnd".getBytes()),
+     *         noTimestampIterator,
+     *         withTimestampIterator,
+     *         "storeName",
+     *         true,  // Forward iteration
+     *         true   // Inclusive upper boundary
+     * )) {
+     *     while (iterator.hasNext()) {
+     *         KeyValue<Bytes, byte[]> entry = iterator.next();
+     *         System.out.println("Key: " + entry.key + ", Value: " + 
Arrays.toString(entry.value));
+     *     }
+     * }
+     * }</pre>
+     *
+     * <h2>Exceptions</h2>
+     *
+     * <ul>
+     *     <li><b>{@link InvalidStateStoreException}:</b> Thrown if the 
iterator is accessed after being closed.</li>
+     *     <li><b>{@link IllegalStateException}:</b> Thrown if the close 
callback is not properly set before usage.</li>
+     * </ul>
+     *
+     * @see AbstractIterator
+     * @see ManagedKeyValueIterator
+     * @see RocksDBStore
+     */
+    private static class RocksDBDualCFRangeIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>> implements 
ManagedKeyValueIterator<Bytes, byte[]> {
+        private Runnable closeCallback;
+        private byte[] noTimestampNext;
+        private byte[] withTimestampNext;
         private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
+        private final RocksIterator noTimestampIterator;
+        private final RocksIterator withTimestampIterator;
         private final String storeName;
-        private final RocksIterator iterWithTimestamp;
-        private final RocksIterator iterNoTimestamp;
         private final boolean forward;
-
+        private final boolean toInclusive;
+        private final byte[] rawLastKey;
         private volatile boolean open = true;
 
-        private byte[] nextWithTimestamp;
-        private byte[] nextNoTimestamp;
-        private KeyValue<Bytes, byte[]> next;
-        private Runnable closeCallback = null;
-
-        RocksDBDualCFIterator(final String storeName,
-                              final RocksIterator iterWithTimestamp,
-                              final RocksIterator iterNoTimestamp,
-                              final boolean forward) {
-            this.iterWithTimestamp = iterWithTimestamp;
-            this.iterNoTimestamp = iterNoTimestamp;
-            this.storeName = storeName;
+        /**
+         * Constructs a new {@code RocksDBDualCFRangeIterator}.
+         *
+         * <p>Initializes the RocksDB iterators for two column families 
(timestamped and non-timestamped) and sets up
+         * the range and direction for iteration.</p>
+         *
+         * @param from                  The starting key of the range. Can be 
{@code null} for an open range.
+         * @param to                    The ending key of the range. Can be 
{@code null} for an open range.
+         * @param noTimestampIterator   The iterator for the non-timestamped 
column family.
+         * @param withTimestampIterator The iterator for the timestamped 
column family.
+         * @param storeName             The name of the store associated with 
this iterator.
+         * @param forward               {@code true} for forward iteration; 
{@code false} for reverse iteration.
+         * @param toInclusive           Whether the upper boundary of the 
range is inclusive.
+         */
+        RocksDBDualCFRangeIterator(final Bytes from,
+                                   final Bytes to,
+                                   final RocksIterator noTimestampIterator,
+                                   final RocksIterator withTimestampIterator,
+                                   final String storeName,
+                                   final boolean forward,
+                                   final boolean toInclusive) {
             this.forward = forward;
+            this.noTimestampIterator = noTimestampIterator;
+            this.storeName = storeName;
+            this.toInclusive = toInclusive;
+            this.withTimestampIterator = withTimestampIterator;
+
+            this.rawLastKey = initializeIterators(from, to);
         }
 
+        /**
+         * Retrieves the next key-value pair in the range.
+         *
+         * <p>This method determines the next key-value pair to return by 
merging the results from the two column
+         * families. If both column families have keys, it selects the one 
that matches the iteration order and range
+         * conditions. Keys outside the specified range are skipped.</p>
+         *
+         * @return The next {@link KeyValue} pair in the range, or {@code 
null} if no more elements are available.
+         */
         @Override
-        public synchronized boolean hasNext() {
-            if (!open) {
-                throw new InvalidStateStoreException(String.format("RocksDB 
iterator for store %s has closed", storeName));
-            }
-            return super.hasNext();
+        protected KeyValue<Bytes, byte[]> makeNext() {
+            loadNextKeys();
+            if (noTimestampNext == null && withTimestampNext == null) return 
allDone();
+            final KeyValue<Bytes, byte[]> next = fetchNextKeyValue();
+            return isInRange(next) ? next : allDone();
+        }
+
+        /**
+         * Returns the next key in the range without advancing the iterator.
+         *
+         * <p>This method retrieves the key of the next {@link KeyValue} pair 
that would be returned by {@link #next()},
+         * without moving the iterator forward. This is useful for inspecting 
the next key without affecting the
+         * iterator's state.</p>
+         *
+         * @return The next key as a {@link Bytes} object.
+         *
+         * @throws NoSuchElementException If there are no more elements in the 
iterator.
+         */
+        @Override
+        public Bytes peekNextKey() {

Review Comment:
   existing test coverage for this class is below average (e.g. there's not 
`peekNextKey` in the test) - could you add that? 
   
   cc @ableegoldman it may be good to refactor the existing tests alongside 
this one so that instead of one really big test we had individual tests for 
each piece of functionality so it's easier to confirm the test coverage is good



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
         }
     }
 
-    private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
+    /**
+     * A range-based iterator for RocksDB that merges results from two column 
families.
+     *
+     * <p>This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+     * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+     * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+     * boundaries.</p>
+     *
+     * <h2>Key Features</h2>
+     *
+     * <ul>
+     *     <li>Merges results from the "with-timestamp" and "no-timestamp" 
column families.</li>
+     *     <li>Supports range-based queries with open or closed 
boundaries.</li>
+     *     <li>Handles both forward and reverse iteration seamlessly.</li>
+     *     <li>Ensures correct handling of inclusive and exclusive upper 
boundaries.</li>
+     *     <li>Integrates efficiently with Kafka Streams state store 
mechanisms.</li>
+     * </ul>
+     *
+     * <h2>Usage</h2>
+     *
+     * <p>The iterator can be used for different types of range-based 
operations, such as:
+     * <ul>
+     *     <li>Iterating over all keys within a range.</li>
+     *     <li>Prefix-based scans (when combined with dynamically calculated 
range endpoints).</li>
+     *     <li>Open-ended range queries (e.g., from a given key to the end of 
the dataset).</li>
+     * </ul>
+     * </p>
+     *
+     * <h2>Implementation Details</h2>
+     *
+     * <p>The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+     * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+     * are merged during iteration, ensuring proper order and de-duplication 
where applicable.</p>
+     *
+     * <h3>Key Methods:</h3>
+     *
+     * <ul>
+     *     <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in 
the merged range, ensuring
+     *     the result is within the specified range and boundary 
conditions.</li>
+     *     <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB 
iterators based on the specified range and direction.</li>
+     *     <li><b>{@code isInRange()}:</b> Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.</li>
+     *     <li><b>{@code fetchNextKeyValue()}:</b> Determines the next 
key-value pair to return based on the state of both iterators.</li>
+     * </ul>
+     *
+     * <h3>Thread Safety:</h3>
+     *
+     * <p>The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+     * threads without external synchronization.</p>
+     *
+     * <h2>Examples</h2>
+     *
+     * <h3>Iterate over a range:</h3>
+     *
+     * <pre>{@code
+     * RocksIterator noTimestampIterator = 
accessor.newIterator(noTimestampColumnFamily);
+     * RocksIterator withTimestampIterator = 
accessor.newIterator(withTimestampColumnFamily);
+     *
+     * try (RocksDBDualCFRangeIterator iterator = new 
RocksDBDualCFRangeIterator(
+     *         new Bytes("keyStart".getBytes()),
+     *         new Bytes("keyEnd".getBytes()),
+     *         noTimestampIterator,
+     *         withTimestampIterator,
+     *         "storeName",
+     *         true,  // Forward iteration
+     *         true   // Inclusive upper boundary
+     * )) {
+     *     while (iterator.hasNext()) {
+     *         KeyValue<Bytes, byte[]> entry = iterator.next();
+     *         System.out.println("Key: " + entry.key + ", Value: " + 
Arrays.toString(entry.value));
+     *     }
+     * }
+     * }</pre>
+     *
+     * <h2>Exceptions</h2>
+     *
+     * <ul>
+     *     <li><b>{@link InvalidStateStoreException}:</b> Thrown if the 
iterator is accessed after being closed.</li>
+     *     <li><b>{@link IllegalStateException}:</b> Thrown if the close 
callback is not properly set before usage.</li>
+     * </ul>
+     *
+     * @see AbstractIterator
+     * @see ManagedKeyValueIterator
+     * @see RocksDBStore
+     */
+    private static class RocksDBDualCFRangeIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>> implements 
ManagedKeyValueIterator<Bytes, byte[]> {
+        private Runnable closeCallback;
+        private byte[] noTimestampNext;
+        private byte[] withTimestampNext;
         private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
+        private final RocksIterator noTimestampIterator;
+        private final RocksIterator withTimestampIterator;
         private final String storeName;
-        private final RocksIterator iterWithTimestamp;
-        private final RocksIterator iterNoTimestamp;
         private final boolean forward;
-
+        private final boolean toInclusive;
+        private final byte[] rawLastKey;
         private volatile boolean open = true;
 
-        private byte[] nextWithTimestamp;
-        private byte[] nextNoTimestamp;
-        private KeyValue<Bytes, byte[]> next;
-        private Runnable closeCallback = null;
-
-        RocksDBDualCFIterator(final String storeName,
-                              final RocksIterator iterWithTimestamp,
-                              final RocksIterator iterNoTimestamp,
-                              final boolean forward) {
-            this.iterWithTimestamp = iterWithTimestamp;
-            this.iterNoTimestamp = iterNoTimestamp;
-            this.storeName = storeName;
+        /**
+         * Constructs a new {@code RocksDBDualCFRangeIterator}.
+         *
+         * <p>Initializes the RocksDB iterators for two column families 
(timestamped and non-timestamped) and sets up
+         * the range and direction for iteration.</p>
+         *
+         * @param from                  The starting key of the range. Can be 
{@code null} for an open range.
+         * @param to                    The ending key of the range. Can be 
{@code null} for an open range.
+         * @param noTimestampIterator   The iterator for the non-timestamped 
column family.
+         * @param withTimestampIterator The iterator for the timestamped 
column family.
+         * @param storeName             The name of the store associated with 
this iterator.
+         * @param forward               {@code true} for forward iteration; 
{@code false} for reverse iteration.
+         * @param toInclusive           Whether the upper boundary of the 
range is inclusive.
+         */
+        RocksDBDualCFRangeIterator(final Bytes from,
+                                   final Bytes to,
+                                   final RocksIterator noTimestampIterator,
+                                   final RocksIterator withTimestampIterator,
+                                   final String storeName,
+                                   final boolean forward,
+                                   final boolean toInclusive) {
             this.forward = forward;
+            this.noTimestampIterator = noTimestampIterator;
+            this.storeName = storeName;
+            this.toInclusive = toInclusive;
+            this.withTimestampIterator = withTimestampIterator;
+
+            this.rawLastKey = initializeIterators(from, to);
         }
 
+        /**
+         * Retrieves the next key-value pair in the range.
+         *
+         * <p>This method determines the next key-value pair to return by 
merging the results from the two column
+         * families. If both column families have keys, it selects the one 
that matches the iteration order and range
+         * conditions. Keys outside the specified range are skipped.</p>
+         *
+         * @return The next {@link KeyValue} pair in the range, or {@code 
null} if no more elements are available.
+         */
         @Override
-        public synchronized boolean hasNext() {
-            if (!open) {
-                throw new InvalidStateStoreException(String.format("RocksDB 
iterator for store %s has closed", storeName));
-            }
-            return super.hasNext();
+        protected KeyValue<Bytes, byte[]> makeNext() {
+            loadNextKeys();
+            if (noTimestampNext == null && withTimestampNext == null) return 
allDone();

Review Comment:
   There seems to be a functionality change. Not sure if this is intentional or 
not but it looks like we drop checking the condition where the iterators are 
not valid. (In other words, there's a branch where after `loadNextKeys()`, 
`noTimestampNext` is still `null` but `iterator.isValid()`. 
   
   Not sure what might cause this condition.
   
   The old code:
   ```java
               if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
                   nextNoTimestamp = iterNoTimestamp.key();
               }
   
               if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
                   nextWithTimestamp = iterWithTimestamp.key();
               }
               if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
                   if (nextWithTimestamp == null && 
!iterWithTimestamp.isValid()) {
                       return allDone();
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
         }
     }
 
-    private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
+    /**
+     * A range-based iterator for RocksDB that merges results from two column 
families.
+     *
+     * <p>This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+     * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+     * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+     * boundaries.</p>
+     *
+     * <h2>Key Features</h2>
+     *
+     * <ul>
+     *     <li>Merges results from the "with-timestamp" and "no-timestamp" 
column families.</li>
+     *     <li>Supports range-based queries with open or closed 
boundaries.</li>
+     *     <li>Handles both forward and reverse iteration seamlessly.</li>
+     *     <li>Ensures correct handling of inclusive and exclusive upper 
boundaries.</li>
+     *     <li>Integrates efficiently with Kafka Streams state store 
mechanisms.</li>
+     * </ul>
+     *
+     * <h2>Usage</h2>
+     *
+     * <p>The iterator can be used for different types of range-based 
operations, such as:
+     * <ul>
+     *     <li>Iterating over all keys within a range.</li>
+     *     <li>Prefix-based scans (when combined with dynamically calculated 
range endpoints).</li>
+     *     <li>Open-ended range queries (e.g., from a given key to the end of 
the dataset).</li>
+     * </ul>
+     * </p>
+     *
+     * <h2>Implementation Details</h2>
+     *
+     * <p>The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+     * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+     * are merged during iteration, ensuring proper order and de-duplication 
where applicable.</p>
+     *
+     * <h3>Key Methods:</h3>
+     *
+     * <ul>
+     *     <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in 
the merged range, ensuring
+     *     the result is within the specified range and boundary 
conditions.</li>
+     *     <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB 
iterators based on the specified range and direction.</li>
+     *     <li><b>{@code isInRange()}:</b> Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.</li>
+     *     <li><b>{@code fetchNextKeyValue()}:</b> Determines the next 
key-value pair to return based on the state of both iterators.</li>
+     * </ul>
+     *
+     * <h3>Thread Safety:</h3>
+     *
+     * <p>The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+     * threads without external synchronization.</p>
+     *
+     * <h2>Examples</h2>
+     *
+     * <h3>Iterate over a range:</h3>
+     *
+     * <pre>{@code
+     * RocksIterator noTimestampIterator = 
accessor.newIterator(noTimestampColumnFamily);
+     * RocksIterator withTimestampIterator = 
accessor.newIterator(withTimestampColumnFamily);
+     *
+     * try (RocksDBDualCFRangeIterator iterator = new 
RocksDBDualCFRangeIterator(
+     *         new Bytes("keyStart".getBytes()),
+     *         new Bytes("keyEnd".getBytes()),
+     *         noTimestampIterator,
+     *         withTimestampIterator,
+     *         "storeName",
+     *         true,  // Forward iteration
+     *         true   // Inclusive upper boundary
+     * )) {
+     *     while (iterator.hasNext()) {
+     *         KeyValue<Bytes, byte[]> entry = iterator.next();
+     *         System.out.println("Key: " + entry.key + ", Value: " + 
Arrays.toString(entry.value));
+     *     }
+     * }
+     * }</pre>
+     *
+     * <h2>Exceptions</h2>
+     *
+     * <ul>
+     *     <li><b>{@link InvalidStateStoreException}:</b> Thrown if the 
iterator is accessed after being closed.</li>
+     *     <li><b>{@link IllegalStateException}:</b> Thrown if the close 
callback is not properly set before usage.</li>
+     * </ul>
+     *
+     * @see AbstractIterator
+     * @see ManagedKeyValueIterator
+     * @see RocksDBStore
+     */
+    private static class RocksDBDualCFRangeIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>> implements 
ManagedKeyValueIterator<Bytes, byte[]> {
+        private Runnable closeCallback;
+        private byte[] noTimestampNext;
+        private byte[] withTimestampNext;
         private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
+        private final RocksIterator noTimestampIterator;
+        private final RocksIterator withTimestampIterator;
         private final String storeName;
-        private final RocksIterator iterWithTimestamp;
-        private final RocksIterator iterNoTimestamp;
         private final boolean forward;
-
+        private final boolean toInclusive;
+        private final byte[] rawLastKey;
         private volatile boolean open = true;
 
-        private byte[] nextWithTimestamp;
-        private byte[] nextNoTimestamp;
-        private KeyValue<Bytes, byte[]> next;
-        private Runnable closeCallback = null;
-
-        RocksDBDualCFIterator(final String storeName,
-                              final RocksIterator iterWithTimestamp,
-                              final RocksIterator iterNoTimestamp,
-                              final boolean forward) {
-            this.iterWithTimestamp = iterWithTimestamp;
-            this.iterNoTimestamp = iterNoTimestamp;
-            this.storeName = storeName;
+        /**
+         * Constructs a new {@code RocksDBDualCFRangeIterator}.
+         *
+         * <p>Initializes the RocksDB iterators for two column families 
(timestamped and non-timestamped) and sets up
+         * the range and direction for iteration.</p>
+         *
+         * @param from                  The starting key of the range. Can be 
{@code null} for an open range.
+         * @param to                    The ending key of the range. Can be 
{@code null} for an open range.
+         * @param noTimestampIterator   The iterator for the non-timestamped 
column family.
+         * @param withTimestampIterator The iterator for the timestamped 
column family.
+         * @param storeName             The name of the store associated with 
this iterator.
+         * @param forward               {@code true} for forward iteration; 
{@code false} for reverse iteration.
+         * @param toInclusive           Whether the upper boundary of the 
range is inclusive.
+         */
+        RocksDBDualCFRangeIterator(final Bytes from,
+                                   final Bytes to,
+                                   final RocksIterator noTimestampIterator,
+                                   final RocksIterator withTimestampIterator,
+                                   final String storeName,
+                                   final boolean forward,
+                                   final boolean toInclusive) {
             this.forward = forward;
+            this.noTimestampIterator = noTimestampIterator;
+            this.storeName = storeName;
+            this.toInclusive = toInclusive;
+            this.withTimestampIterator = withTimestampIterator;
+
+            this.rawLastKey = initializeIterators(from, to);
         }
 
+        /**
+         * Retrieves the next key-value pair in the range.
+         *
+         * <p>This method determines the next key-value pair to return by 
merging the results from the two column
+         * families. If both column families have keys, it selects the one 
that matches the iteration order and range
+         * conditions. Keys outside the specified range are skipped.</p>
+         *
+         * @return The next {@link KeyValue} pair in the range, or {@code 
null} if no more elements are available.
+         */
         @Override
-        public synchronized boolean hasNext() {
-            if (!open) {
-                throw new InvalidStateStoreException(String.format("RocksDB 
iterator for store %s has closed", storeName));
-            }
-            return super.hasNext();
+        protected KeyValue<Bytes, byte[]> makeNext() {
+            loadNextKeys();
+            if (noTimestampNext == null && withTimestampNext == null) return 
allDone();
+            final KeyValue<Bytes, byte[]> next = fetchNextKeyValue();
+            return isInRange(next) ? next : allDone();
+        }
+
+        /**
+         * Returns the next key in the range without advancing the iterator.
+         *
+         * <p>This method retrieves the key of the next {@link KeyValue} pair 
that would be returned by {@link #next()},
+         * without moving the iterator forward. This is useful for inspecting 
the next key without affecting the
+         * iterator's state.</p>
+         *
+         * @return The next key as a {@link Bytes} object.
+         *
+         * @throws NoSuchElementException If there are no more elements in the 
iterator.
+         */
+        @Override
+        public Bytes peekNextKey() {
+            if (!hasNext()) throw new NoSuchElementException();
+            return super.peek().key;
         }
 
+        /**
+         * Advances the iterator and returns the next key-value pair.
+         *
+         * @return The next {@link KeyValue} pair in the range.
+         *
+         * @throws InvalidStateStoreException If the iterator has been closed.
+         */
         @Override
         public synchronized KeyValue<Bytes, byte[]> next() {
+            ensureOpen();
             return super.next();
         }
 
+        /**
+         * Checks if there are more elements available in the range.
+         *
+         * @return {@code true} if the iterator has more elements; {@code 
false} otherwise.
+         *
+         * @throws InvalidStateStoreException If the iterator has been closed.
+         */
         @Override
-        protected KeyValue<Bytes, byte[]> makeNext() {
-            if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
-                nextNoTimestamp = iterNoTimestamp.key();
-            }
-
-            if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
-                nextWithTimestamp = iterWithTimestamp.key();
-            }
-
-            if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
-                if (nextWithTimestamp == null && !iterWithTimestamp.isValid()) 
{
-                    return allDone();
-                } else {
-                    next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
-                    nextWithTimestamp = null;
-                    if (forward) {
-                        iterWithTimestamp.next();
-                    } else {
-                        iterWithTimestamp.prev();
-                    }
-                }
-            } else {
-                if (nextWithTimestamp == null) {
-                    next = KeyValue.pair(new Bytes(nextNoTimestamp), 
convertToTimestampedFormat(iterNoTimestamp.value()));
-                    nextNoTimestamp = null;
-                    if (forward) {
-                        iterNoTimestamp.next();
-                    } else {
-                        iterNoTimestamp.prev();
-                    }
-                } else {
-                    if (forward) {
-                        if (comparator.compare(nextNoTimestamp, 
nextWithTimestamp) <= 0) {
-                            next = KeyValue.pair(new Bytes(nextNoTimestamp), 
convertToTimestampedFormat(iterNoTimestamp.value()));
-                            nextNoTimestamp = null;
-                            iterNoTimestamp.next();
-                        } else {
-                            next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
-                            nextWithTimestamp = null;
-                            iterWithTimestamp.next();
-                        }
-                    } else {
-                        if (comparator.compare(nextNoTimestamp, 
nextWithTimestamp) >= 0) {
-                            next = KeyValue.pair(new Bytes(nextNoTimestamp), 
convertToTimestampedFormat(iterNoTimestamp.value()));
-                            nextNoTimestamp = null;
-                            iterNoTimestamp.prev();
-                        } else {
-                            next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
-                            nextWithTimestamp = null;
-                            iterWithTimestamp.prev();
-                        }
-                    }
-                }
-            }
-            return next;
+        public synchronized boolean hasNext() {
+            ensureOpen();
+            return super.hasNext();
         }
 
+        /**
+         * Closes the iterator and releases associated resources.
+         *
+         * <p>This method ensures that the RocksDB iterators for both column 
families are properly closed. After this
+         * method is called, any subsequent calls to {@link #hasNext()}, 
{@link #next()}, or {@link #peekNextKey()} will
+         * result in an {@link InvalidStateStoreException}.</p>
+         *
+         * @throws IllegalStateException If the close callback has not been 
set before calling this method.
+         */
         @Override
         public synchronized void close() {
             if (closeCallback == null) {
-                throw new IllegalStateException("RocksDBDualCFIterator expects 
close callback to be set immediately upon creation");
+                final String message = "RocksDBDualCFIterator expects close 
callback to be set immediately upon creation";
+                throw new IllegalStateException(message);
             }
             closeCallback.run();
 
-            iterNoTimestamp.close();
-            iterWithTimestamp.close();
+            noTimestampIterator.close();
+            withTimestampIterator.close();
             open = false;
         }
 
+        /**
+         * Registers a callback to be executed when the iterator is closed.
+         *
+         * @param closeCallback A {@link Runnable} to execute during the 
{@link #close()} operation.
+         */
         @Override
-        public Bytes peekNextKey() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
+        public void onClose(final Runnable closeCallback) {
+            this.closeCallback = closeCallback;
+        }
+
+        private KeyValue<Bytes, byte[]> compareAndHandleKeys() {
+            final int comparison = comparator.compare(noTimestampNext, 
withTimestampNext);
+            if (forward ? comparison <= 0 : comparison >= 0) {
+                return handleNoTimestampOnly();
+            } else {
+                return handleWithTimestampOnly();
             }
-            return next.key;
         }
 
-        @Override
-        public void onClose(final Runnable closeCallback) {
-            this.closeCallback = closeCallback;
+        /**
+         * Determines the next key-value pair to return.
+         *
+         * <p>If one of the column family iterators is exhausted, the method 
returns the result from the other iterator.
+         * If both iterators have keys, the method compares the keys and 
returns the appropriate result based on the
+         * iteration direction.</p>
+         *
+         * @return The next {@link KeyValue} pair to return.
+         */
+        private KeyValue<Bytes, byte[]> fetchNextKeyValue() {
+            if (noTimestampNext == null) {
+                return handleWithTimestampOnly();
+            } else if (withTimestampNext == null) {
+                return handleNoTimestampOnly();
+            } else {
+                return compareAndHandleKeys();
+            }
         }
-    }
 
-    private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator {
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-        private final byte[] rawLastKey;
-        private final boolean forward;
-        private final boolean toInclusive;
+        private KeyValue<Bytes, byte[]> handleNoTimestampOnly() {
+            final KeyValue<Bytes, byte[]> result = KeyValue.pair(new 
Bytes(noTimestampNext), 
convertToTimestampedFormat(noTimestampIterator.value()));
+            moveIterator(noTimestampIterator);
+            noTimestampNext = null;
+            return result;
+        }
 
-        RocksDBDualCFRangeIterator(final String storeName,
-                                   final RocksIterator iterWithTimestamp,
-                                   final RocksIterator iterNoTimestamp,
-                                   final Bytes from,
-                                   final Bytes to,
-                                   final boolean forward,
-                                   final boolean toInclusive) {
-            super(storeName, iterWithTimestamp, iterNoTimestamp, forward);
-            this.forward = forward;
-            this.toInclusive = toInclusive;
+        private KeyValue<Bytes, byte[]> handleWithTimestampOnly() {
+            final KeyValue<Bytes, byte[]> result = KeyValue.pair(new 
Bytes(withTimestampNext), withTimestampIterator.value());
+            moveIterator(withTimestampIterator);
+            withTimestampNext = null;
+            return result;
+        }
+
+        /**
+         * Checks if the given key-value pair is within the specified range.
+         *
+         * <p>The method compares the key against the range's upper boundary 
({@code rawLastKey}) and determines if it
+         * falls within the range.</p>
+         *
+         * @param keyValue The key-value pair to check.
+         *
+         * @return {@code true} if the key is within the range; {@code false} 
otherwise.
+         */
+        private boolean isInRange(final KeyValue<Bytes, byte[]> keyValue) {
+            if (rawLastKey == null) return true; // Open-ended range
+            final int comparison = comparator.compare(keyValue.key.get(), 
rawLastKey);
+            return forward
+                    ? comparison < 0 || (toInclusive && comparison == 0)
+                    : comparison > 0 || (toInclusive && comparison == 0);

Review Comment:
   ```suggestion
               return (toInclusive && comparison == 0) || (forward ? comparison 
< 0 : comparison > 0);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to