agavra commented on code in PR #18610:
URL: https://github.com/apache/kafka/pull/18610#discussion_r1960767233
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
}
}
- private class RocksDBDualCFIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>>
- implements ManagedKeyValueIterator<Bytes, byte[]> {
-
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
+ /**
+ * A range-based iterator for RocksDB that merges results from two column
families.
+ *
+ * <p>This iterator supports traversal over two RocksDB column families:
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or
reverse) and the specified range
+ * boundaries.</p>
+ *
+ * <h2>Key Features</h2>
+ *
+ * <ul>
+ * <li>Merges results from the "with-timestamp" and "no-timestamp"
column families.</li>
+ * <li>Supports range-based queries with open or closed
boundaries.</li>
+ * <li>Handles both forward and reverse iteration seamlessly.</li>
+ * <li>Ensures correct handling of inclusive and exclusive upper
boundaries.</li>
+ * <li>Integrates efficiently with Kafka Streams state store
mechanisms.</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ *
+ * <p>The iterator can be used for different types of range-based
operations, such as:
+ * <ul>
+ * <li>Iterating over all keys within a range.</li>
+ * <li>Prefix-based scans (when combined with dynamically calculated
range endpoints).</li>
+ * <li>Open-ended range queries (e.g., from a given key to the end of
the dataset).</li>
+ * </ul>
+ * </p>
+ *
+ * <h2>Implementation Details</h2>
+ *
+ * <p>The class extends {@link AbstractIterator} and implements {@link
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication
where applicable.</p>
+ *
+ * <h3>Key Methods:</h3>
+ *
+ * <ul>
+ * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in
the merged range, ensuring
+ * the result is within the specified range and boundary
conditions.</li>
+ * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB
iterators based on the specified range and direction.</li>
+ * <li><b>{@code isInRange()}:</b> Verifies if the current key-value
pair is within the range defined by {@code from} and {@code to}.</li>
+ * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next
key-value pair to return based on the state of both iterators.</li>
+ * </ul>
+ *
+ * <h3>Thread Safety:</h3>
+ *
+ * <p>The iterator is thread-safe for sequential operations but should not
be accessed concurrently from multiple
+ * threads without external synchronization.</p>
Review Comment:
what does this mean? doesn't that just mean it's not thread safe?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
}
}
- private class RocksDBDualCFIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>>
- implements ManagedKeyValueIterator<Bytes, byte[]> {
-
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
+ /**
+ * A range-based iterator for RocksDB that merges results from two column
families.
+ *
+ * <p>This iterator supports traversal over two RocksDB column families:
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or
reverse) and the specified range
+ * boundaries.</p>
+ *
+ * <h2>Key Features</h2>
+ *
+ * <ul>
+ * <li>Merges results from the "with-timestamp" and "no-timestamp"
column families.</li>
+ * <li>Supports range-based queries with open or closed
boundaries.</li>
+ * <li>Handles both forward and reverse iteration seamlessly.</li>
+ * <li>Ensures correct handling of inclusive and exclusive upper
boundaries.</li>
+ * <li>Integrates efficiently with Kafka Streams state store
mechanisms.</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ *
+ * <p>The iterator can be used for different types of range-based
operations, such as:
+ * <ul>
+ * <li>Iterating over all keys within a range.</li>
+ * <li>Prefix-based scans (when combined with dynamically calculated
range endpoints).</li>
+ * <li>Open-ended range queries (e.g., from a given key to the end of
the dataset).</li>
+ * </ul>
+ * </p>
+ *
+ * <h2>Implementation Details</h2>
+ *
+ * <p>The class extends {@link AbstractIterator} and implements {@link
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication
where applicable.</p>
+ *
+ * <h3>Key Methods:</h3>
+ *
+ * <ul>
+ * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in
the merged range, ensuring
+ * the result is within the specified range and boundary
conditions.</li>
+ * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB
iterators based on the specified range and direction.</li>
+ * <li><b>{@code isInRange()}:</b> Verifies if the current key-value
pair is within the range defined by {@code from} and {@code to}.</li>
+ * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next
key-value pair to return based on the state of both iterators.</li>
+ * </ul>
+ *
+ * <h3>Thread Safety:</h3>
+ *
+ * <p>The iterator is thread-safe for sequential operations but should not
be accessed concurrently from multiple
+ * threads without external synchronization.</p>
+ *
+ * <h2>Examples</h2>
+ *
+ * <h3>Iterate over a range:</h3>
+ *
+ * <pre>{@code
+ * RocksIterator noTimestampIterator =
accessor.newIterator(noTimestampColumnFamily);
+ * RocksIterator withTimestampIterator =
accessor.newIterator(withTimestampColumnFamily);
+ *
+ * try (RocksDBDualCFRangeIterator iterator = new
RocksDBDualCFRangeIterator(
+ * new Bytes("keyStart".getBytes()),
+ * new Bytes("keyEnd".getBytes()),
+ * noTimestampIterator,
+ * withTimestampIterator,
+ * "storeName",
+ * true, // Forward iteration
+ * true // Inclusive upper boundary
+ * )) {
+ * while (iterator.hasNext()) {
+ * KeyValue<Bytes, byte[]> entry = iterator.next();
+ * System.out.println("Key: " + entry.key + ", Value: " +
Arrays.toString(entry.value));
+ * }
+ * }
+ * }</pre>
+ *
+ * <h2>Exceptions</h2>
+ *
+ * <ul>
+ * <li><b>{@link InvalidStateStoreException}:</b> Thrown if the
iterator is accessed after being closed.</li>
+ * <li><b>{@link IllegalStateException}:</b> Thrown if the close
callback is not properly set before usage.</li>
+ * </ul>
+ *
+ * @see AbstractIterator
+ * @see ManagedKeyValueIterator
+ * @see RocksDBStore
+ */
+ private static class RocksDBDualCFRangeIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>> implements
ManagedKeyValueIterator<Bytes, byte[]> {
+ private Runnable closeCallback;
+ private byte[] noTimestampNext;
+ private byte[] withTimestampNext;
private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
-
+ private final RocksIterator noTimestampIterator;
+ private final RocksIterator withTimestampIterator;
private final String storeName;
- private final RocksIterator iterWithTimestamp;
- private final RocksIterator iterNoTimestamp;
private final boolean forward;
-
+ private final boolean toInclusive;
+ private final byte[] rawLastKey;
private volatile boolean open = true;
- private byte[] nextWithTimestamp;
- private byte[] nextNoTimestamp;
- private KeyValue<Bytes, byte[]> next;
- private Runnable closeCallback = null;
-
- RocksDBDualCFIterator(final String storeName,
- final RocksIterator iterWithTimestamp,
- final RocksIterator iterNoTimestamp,
- final boolean forward) {
- this.iterWithTimestamp = iterWithTimestamp;
- this.iterNoTimestamp = iterNoTimestamp;
- this.storeName = storeName;
+ /**
+ * Constructs a new {@code RocksDBDualCFRangeIterator}.
+ *
+ * <p>Initializes the RocksDB iterators for two column families
(timestamped and non-timestamped) and sets up
+ * the range and direction for iteration.</p>
+ *
+ * @param from The starting key of the range. Can be
{@code null} for an open range.
+ * @param to The ending key of the range. Can be
{@code null} for an open range.
+ * @param noTimestampIterator The iterator for the non-timestamped
column family.
+ * @param withTimestampIterator The iterator for the timestamped
column family.
+ * @param storeName The name of the store associated with
this iterator.
+ * @param forward {@code true} for forward iteration;
{@code false} for reverse iteration.
+ * @param toInclusive Whether the upper boundary of the
range is inclusive.
+ */
+ RocksDBDualCFRangeIterator(final Bytes from,
+ final Bytes to,
+ final RocksIterator noTimestampIterator,
+ final RocksIterator withTimestampIterator,
+ final String storeName,
+ final boolean forward,
+ final boolean toInclusive) {
this.forward = forward;
+ this.noTimestampIterator = noTimestampIterator;
+ this.storeName = storeName;
+ this.toInclusive = toInclusive;
+ this.withTimestampIterator = withTimestampIterator;
+
+ this.rawLastKey = initializeIterators(from, to);
Review Comment:
generally it's better for constructors not to have side effects - otherwise
it's possible that if something throws an exception during instantiation the
`close` method won't be called.
this doesn't seem to be a regression (the old code did that as well for the
RangeIterator) so consider this comment an optional suggestion
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
}
}
- private class RocksDBDualCFIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>>
- implements ManagedKeyValueIterator<Bytes, byte[]> {
-
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
+ /**
+ * A range-based iterator for RocksDB that merges results from two column
families.
+ *
+ * <p>This iterator supports traversal over two RocksDB column families:
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or
reverse) and the specified range
+ * boundaries.</p>
+ *
+ * <h2>Key Features</h2>
+ *
+ * <ul>
+ * <li>Merges results from the "with-timestamp" and "no-timestamp"
column families.</li>
+ * <li>Supports range-based queries with open or closed
boundaries.</li>
+ * <li>Handles both forward and reverse iteration seamlessly.</li>
+ * <li>Ensures correct handling of inclusive and exclusive upper
boundaries.</li>
+ * <li>Integrates efficiently with Kafka Streams state store
mechanisms.</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ *
+ * <p>The iterator can be used for different types of range-based
operations, such as:
+ * <ul>
+ * <li>Iterating over all keys within a range.</li>
+ * <li>Prefix-based scans (when combined with dynamically calculated
range endpoints).</li>
+ * <li>Open-ended range queries (e.g., from a given key to the end of
the dataset).</li>
+ * </ul>
+ * </p>
+ *
+ * <h2>Implementation Details</h2>
+ *
+ * <p>The class extends {@link AbstractIterator} and implements {@link
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication
where applicable.</p>
+ *
+ * <h3>Key Methods:</h3>
+ *
+ * <ul>
+ * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in
the merged range, ensuring
+ * the result is within the specified range and boundary
conditions.</li>
+ * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB
iterators based on the specified range and direction.</li>
+ * <li><b>{@code isInRange()}:</b> Verifies if the current key-value
pair is within the range defined by {@code from} and {@code to}.</li>
+ * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next
key-value pair to return based on the state of both iterators.</li>
+ * </ul>
+ *
+ * <h3>Thread Safety:</h3>
+ *
+ * <p>The iterator is thread-safe for sequential operations but should not
be accessed concurrently from multiple
+ * threads without external synchronization.</p>
+ *
+ * <h2>Examples</h2>
+ *
+ * <h3>Iterate over a range:</h3>
+ *
+ * <pre>{@code
+ * RocksIterator noTimestampIterator =
accessor.newIterator(noTimestampColumnFamily);
+ * RocksIterator withTimestampIterator =
accessor.newIterator(withTimestampColumnFamily);
+ *
+ * try (RocksDBDualCFRangeIterator iterator = new
RocksDBDualCFRangeIterator(
+ * new Bytes("keyStart".getBytes()),
+ * new Bytes("keyEnd".getBytes()),
+ * noTimestampIterator,
+ * withTimestampIterator,
+ * "storeName",
+ * true, // Forward iteration
+ * true // Inclusive upper boundary
+ * )) {
+ * while (iterator.hasNext()) {
+ * KeyValue<Bytes, byte[]> entry = iterator.next();
+ * System.out.println("Key: " + entry.key + ", Value: " +
Arrays.toString(entry.value));
+ * }
+ * }
+ * }</pre>
+ *
+ * <h2>Exceptions</h2>
+ *
+ * <ul>
+ * <li><b>{@link InvalidStateStoreException}:</b> Thrown if the
iterator is accessed after being closed.</li>
+ * <li><b>{@link IllegalStateException}:</b> Thrown if the close
callback is not properly set before usage.</li>
+ * </ul>
+ *
+ * @see AbstractIterator
+ * @see ManagedKeyValueIterator
+ * @see RocksDBStore
+ */
+ private static class RocksDBDualCFRangeIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>> implements
ManagedKeyValueIterator<Bytes, byte[]> {
+ private Runnable closeCallback;
+ private byte[] noTimestampNext;
+ private byte[] withTimestampNext;
private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
-
+ private final RocksIterator noTimestampIterator;
+ private final RocksIterator withTimestampIterator;
private final String storeName;
- private final RocksIterator iterWithTimestamp;
- private final RocksIterator iterNoTimestamp;
private final boolean forward;
-
+ private final boolean toInclusive;
+ private final byte[] rawLastKey;
private volatile boolean open = true;
- private byte[] nextWithTimestamp;
- private byte[] nextNoTimestamp;
- private KeyValue<Bytes, byte[]> next;
- private Runnable closeCallback = null;
-
- RocksDBDualCFIterator(final String storeName,
- final RocksIterator iterWithTimestamp,
- final RocksIterator iterNoTimestamp,
- final boolean forward) {
- this.iterWithTimestamp = iterWithTimestamp;
- this.iterNoTimestamp = iterNoTimestamp;
- this.storeName = storeName;
+ /**
+ * Constructs a new {@code RocksDBDualCFRangeIterator}.
+ *
+ * <p>Initializes the RocksDB iterators for two column families
(timestamped and non-timestamped) and sets up
+ * the range and direction for iteration.</p>
+ *
+ * @param from The starting key of the range. Can be
{@code null} for an open range.
+ * @param to The ending key of the range. Can be
{@code null} for an open range.
+ * @param noTimestampIterator The iterator for the non-timestamped
column family.
+ * @param withTimestampIterator The iterator for the timestamped
column family.
+ * @param storeName The name of the store associated with
this iterator.
+ * @param forward {@code true} for forward iteration;
{@code false} for reverse iteration.
+ * @param toInclusive Whether the upper boundary of the
range is inclusive.
+ */
+ RocksDBDualCFRangeIterator(final Bytes from,
+ final Bytes to,
+ final RocksIterator noTimestampIterator,
+ final RocksIterator withTimestampIterator,
+ final String storeName,
+ final boolean forward,
+ final boolean toInclusive) {
this.forward = forward;
+ this.noTimestampIterator = noTimestampIterator;
+ this.storeName = storeName;
+ this.toInclusive = toInclusive;
+ this.withTimestampIterator = withTimestampIterator;
+
+ this.rawLastKey = initializeIterators(from, to);
}
+ /**
+ * Retrieves the next key-value pair in the range.
+ *
+ * <p>This method determines the next key-value pair to return by
merging the results from the two column
+ * families. If both column families have keys, it selects the one
that matches the iteration order and range
+ * conditions. Keys outside the specified range are skipped.</p>
+ *
+ * @return The next {@link KeyValue} pair in the range, or {@code
null} if no more elements are available.
+ */
@Override
- public synchronized boolean hasNext() {
- if (!open) {
- throw new InvalidStateStoreException(String.format("RocksDB
iterator for store %s has closed", storeName));
- }
- return super.hasNext();
+ protected KeyValue<Bytes, byte[]> makeNext() {
+ loadNextKeys();
+ if (noTimestampNext == null && withTimestampNext == null) return
allDone();
+ final KeyValue<Bytes, byte[]> next = fetchNextKeyValue();
+ return isInRange(next) ? next : allDone();
+ }
+
+ /**
+ * Returns the next key in the range without advancing the iterator.
+ *
+ * <p>This method retrieves the key of the next {@link KeyValue} pair
that would be returned by {@link #next()},
+ * without moving the iterator forward. This is useful for inspecting
the next key without affecting the
+ * iterator's state.</p>
+ *
+ * @return The next key as a {@link Bytes} object.
+ *
+ * @throws NoSuchElementException If there are no more elements in the
iterator.
+ */
+ @Override
+ public Bytes peekNextKey() {
Review Comment:
existing test coverage for this class is below average (e.g. there's not
`peekNextKey` in the test) - could you add that?
cc @ableegoldman it may be good to refactor the existing tests alongside
this one so that instead of one really big test we had individual tests for
each piece of functionality so it's easier to confirm the test coverage is good
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
}
}
- private class RocksDBDualCFIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>>
- implements ManagedKeyValueIterator<Bytes, byte[]> {
-
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
+ /**
+ * A range-based iterator for RocksDB that merges results from two column
families.
+ *
+ * <p>This iterator supports traversal over two RocksDB column families:
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or
reverse) and the specified range
+ * boundaries.</p>
+ *
+ * <h2>Key Features</h2>
+ *
+ * <ul>
+ * <li>Merges results from the "with-timestamp" and "no-timestamp"
column families.</li>
+ * <li>Supports range-based queries with open or closed
boundaries.</li>
+ * <li>Handles both forward and reverse iteration seamlessly.</li>
+ * <li>Ensures correct handling of inclusive and exclusive upper
boundaries.</li>
+ * <li>Integrates efficiently with Kafka Streams state store
mechanisms.</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ *
+ * <p>The iterator can be used for different types of range-based
operations, such as:
+ * <ul>
+ * <li>Iterating over all keys within a range.</li>
+ * <li>Prefix-based scans (when combined with dynamically calculated
range endpoints).</li>
+ * <li>Open-ended range queries (e.g., from a given key to the end of
the dataset).</li>
+ * </ul>
+ * </p>
+ *
+ * <h2>Implementation Details</h2>
+ *
+ * <p>The class extends {@link AbstractIterator} and implements {@link
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication
where applicable.</p>
+ *
+ * <h3>Key Methods:</h3>
+ *
+ * <ul>
+ * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in
the merged range, ensuring
+ * the result is within the specified range and boundary
conditions.</li>
+ * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB
iterators based on the specified range and direction.</li>
+ * <li><b>{@code isInRange()}:</b> Verifies if the current key-value
pair is within the range defined by {@code from} and {@code to}.</li>
+ * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next
key-value pair to return based on the state of both iterators.</li>
+ * </ul>
+ *
+ * <h3>Thread Safety:</h3>
+ *
+ * <p>The iterator is thread-safe for sequential operations but should not
be accessed concurrently from multiple
+ * threads without external synchronization.</p>
+ *
+ * <h2>Examples</h2>
+ *
+ * <h3>Iterate over a range:</h3>
+ *
+ * <pre>{@code
+ * RocksIterator noTimestampIterator =
accessor.newIterator(noTimestampColumnFamily);
+ * RocksIterator withTimestampIterator =
accessor.newIterator(withTimestampColumnFamily);
+ *
+ * try (RocksDBDualCFRangeIterator iterator = new
RocksDBDualCFRangeIterator(
+ * new Bytes("keyStart".getBytes()),
+ * new Bytes("keyEnd".getBytes()),
+ * noTimestampIterator,
+ * withTimestampIterator,
+ * "storeName",
+ * true, // Forward iteration
+ * true // Inclusive upper boundary
+ * )) {
+ * while (iterator.hasNext()) {
+ * KeyValue<Bytes, byte[]> entry = iterator.next();
+ * System.out.println("Key: " + entry.key + ", Value: " +
Arrays.toString(entry.value));
+ * }
+ * }
+ * }</pre>
+ *
+ * <h2>Exceptions</h2>
+ *
+ * <ul>
+ * <li><b>{@link InvalidStateStoreException}:</b> Thrown if the
iterator is accessed after being closed.</li>
+ * <li><b>{@link IllegalStateException}:</b> Thrown if the close
callback is not properly set before usage.</li>
+ * </ul>
+ *
+ * @see AbstractIterator
+ * @see ManagedKeyValueIterator
+ * @see RocksDBStore
+ */
+ private static class RocksDBDualCFRangeIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>> implements
ManagedKeyValueIterator<Bytes, byte[]> {
+ private Runnable closeCallback;
+ private byte[] noTimestampNext;
+ private byte[] withTimestampNext;
private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
-
+ private final RocksIterator noTimestampIterator;
+ private final RocksIterator withTimestampIterator;
private final String storeName;
- private final RocksIterator iterWithTimestamp;
- private final RocksIterator iterNoTimestamp;
private final boolean forward;
-
+ private final boolean toInclusive;
+ private final byte[] rawLastKey;
private volatile boolean open = true;
- private byte[] nextWithTimestamp;
- private byte[] nextNoTimestamp;
- private KeyValue<Bytes, byte[]> next;
- private Runnable closeCallback = null;
-
- RocksDBDualCFIterator(final String storeName,
- final RocksIterator iterWithTimestamp,
- final RocksIterator iterNoTimestamp,
- final boolean forward) {
- this.iterWithTimestamp = iterWithTimestamp;
- this.iterNoTimestamp = iterNoTimestamp;
- this.storeName = storeName;
+ /**
+ * Constructs a new {@code RocksDBDualCFRangeIterator}.
+ *
+ * <p>Initializes the RocksDB iterators for two column families
(timestamped and non-timestamped) and sets up
+ * the range and direction for iteration.</p>
+ *
+ * @param from The starting key of the range. Can be
{@code null} for an open range.
+ * @param to The ending key of the range. Can be
{@code null} for an open range.
+ * @param noTimestampIterator The iterator for the non-timestamped
column family.
+ * @param withTimestampIterator The iterator for the timestamped
column family.
+ * @param storeName The name of the store associated with
this iterator.
+ * @param forward {@code true} for forward iteration;
{@code false} for reverse iteration.
+ * @param toInclusive Whether the upper boundary of the
range is inclusive.
+ */
+ RocksDBDualCFRangeIterator(final Bytes from,
+ final Bytes to,
+ final RocksIterator noTimestampIterator,
+ final RocksIterator withTimestampIterator,
+ final String storeName,
+ final boolean forward,
+ final boolean toInclusive) {
this.forward = forward;
+ this.noTimestampIterator = noTimestampIterator;
+ this.storeName = storeName;
+ this.toInclusive = toInclusive;
+ this.withTimestampIterator = withTimestampIterator;
+
+ this.rawLastKey = initializeIterators(from, to);
}
+ /**
+ * Retrieves the next key-value pair in the range.
+ *
+ * <p>This method determines the next key-value pair to return by
merging the results from the two column
+ * families. If both column families have keys, it selects the one
that matches the iteration order and range
+ * conditions. Keys outside the specified range are skipped.</p>
+ *
+ * @return The next {@link KeyValue} pair in the range, or {@code
null} if no more elements are available.
+ */
@Override
- public synchronized boolean hasNext() {
- if (!open) {
- throw new InvalidStateStoreException(String.format("RocksDB
iterator for store %s has closed", storeName));
- }
- return super.hasNext();
+ protected KeyValue<Bytes, byte[]> makeNext() {
+ loadNextKeys();
+ if (noTimestampNext == null && withTimestampNext == null) return
allDone();
Review Comment:
There seems to be a functionality change. Not sure if this is intentional or
not but it looks like we drop checking the condition where the iterators are
not valid. (In other words, there's a branch where after `loadNextKeys()`,
`noTimestampNext` is still `null` but `iterator.isValid()`.
Not sure what might cause this condition.
The old code:
```java
if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
nextNoTimestamp = iterNoTimestamp.key();
}
if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
nextWithTimestamp = iterWithTimestamp.key();
}
if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
if (nextWithTimestamp == null &&
!iterWithTimestamp.isValid()) {
return allDone();
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
}
}
- private class RocksDBDualCFIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>>
- implements ManagedKeyValueIterator<Bytes, byte[]> {
-
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
+ /**
+ * A range-based iterator for RocksDB that merges results from two column
families.
+ *
+ * <p>This iterator supports traversal over two RocksDB column families:
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or
reverse) and the specified range
+ * boundaries.</p>
+ *
+ * <h2>Key Features</h2>
+ *
+ * <ul>
+ * <li>Merges results from the "with-timestamp" and "no-timestamp"
column families.</li>
+ * <li>Supports range-based queries with open or closed
boundaries.</li>
+ * <li>Handles both forward and reverse iteration seamlessly.</li>
+ * <li>Ensures correct handling of inclusive and exclusive upper
boundaries.</li>
+ * <li>Integrates efficiently with Kafka Streams state store
mechanisms.</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ *
+ * <p>The iterator can be used for different types of range-based
operations, such as:
+ * <ul>
+ * <li>Iterating over all keys within a range.</li>
+ * <li>Prefix-based scans (when combined with dynamically calculated
range endpoints).</li>
+ * <li>Open-ended range queries (e.g., from a given key to the end of
the dataset).</li>
+ * </ul>
+ * </p>
+ *
+ * <h2>Implementation Details</h2>
+ *
+ * <p>The class extends {@link AbstractIterator} and implements {@link
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication
where applicable.</p>
+ *
+ * <h3>Key Methods:</h3>
+ *
+ * <ul>
+ * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in
the merged range, ensuring
+ * the result is within the specified range and boundary
conditions.</li>
+ * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB
iterators based on the specified range and direction.</li>
+ * <li><b>{@code isInRange()}:</b> Verifies if the current key-value
pair is within the range defined by {@code from} and {@code to}.</li>
+ * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next
key-value pair to return based on the state of both iterators.</li>
+ * </ul>
+ *
+ * <h3>Thread Safety:</h3>
+ *
+ * <p>The iterator is thread-safe for sequential operations but should not
be accessed concurrently from multiple
+ * threads without external synchronization.</p>
+ *
+ * <h2>Examples</h2>
+ *
+ * <h3>Iterate over a range:</h3>
+ *
+ * <pre>{@code
+ * RocksIterator noTimestampIterator =
accessor.newIterator(noTimestampColumnFamily);
+ * RocksIterator withTimestampIterator =
accessor.newIterator(withTimestampColumnFamily);
+ *
+ * try (RocksDBDualCFRangeIterator iterator = new
RocksDBDualCFRangeIterator(
+ * new Bytes("keyStart".getBytes()),
+ * new Bytes("keyEnd".getBytes()),
+ * noTimestampIterator,
+ * withTimestampIterator,
+ * "storeName",
+ * true, // Forward iteration
+ * true // Inclusive upper boundary
+ * )) {
+ * while (iterator.hasNext()) {
+ * KeyValue<Bytes, byte[]> entry = iterator.next();
+ * System.out.println("Key: " + entry.key + ", Value: " +
Arrays.toString(entry.value));
+ * }
+ * }
+ * }</pre>
+ *
+ * <h2>Exceptions</h2>
+ *
+ * <ul>
+ * <li><b>{@link InvalidStateStoreException}:</b> Thrown if the
iterator is accessed after being closed.</li>
+ * <li><b>{@link IllegalStateException}:</b> Thrown if the close
callback is not properly set before usage.</li>
+ * </ul>
+ *
+ * @see AbstractIterator
+ * @see ManagedKeyValueIterator
+ * @see RocksDBStore
+ */
+ private static class RocksDBDualCFRangeIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>> implements
ManagedKeyValueIterator<Bytes, byte[]> {
+ private Runnable closeCallback;
+ private byte[] noTimestampNext;
+ private byte[] withTimestampNext;
private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
-
+ private final RocksIterator noTimestampIterator;
+ private final RocksIterator withTimestampIterator;
private final String storeName;
- private final RocksIterator iterWithTimestamp;
- private final RocksIterator iterNoTimestamp;
private final boolean forward;
-
+ private final boolean toInclusive;
+ private final byte[] rawLastKey;
private volatile boolean open = true;
- private byte[] nextWithTimestamp;
- private byte[] nextNoTimestamp;
- private KeyValue<Bytes, byte[]> next;
- private Runnable closeCallback = null;
-
- RocksDBDualCFIterator(final String storeName,
- final RocksIterator iterWithTimestamp,
- final RocksIterator iterNoTimestamp,
- final boolean forward) {
- this.iterWithTimestamp = iterWithTimestamp;
- this.iterNoTimestamp = iterNoTimestamp;
- this.storeName = storeName;
+ /**
+ * Constructs a new {@code RocksDBDualCFRangeIterator}.
+ *
+ * <p>Initializes the RocksDB iterators for two column families
(timestamped and non-timestamped) and sets up
+ * the range and direction for iteration.</p>
+ *
+ * @param from The starting key of the range. Can be
{@code null} for an open range.
+ * @param to The ending key of the range. Can be
{@code null} for an open range.
+ * @param noTimestampIterator The iterator for the non-timestamped
column family.
+ * @param withTimestampIterator The iterator for the timestamped
column family.
+ * @param storeName The name of the store associated with
this iterator.
+ * @param forward {@code true} for forward iteration;
{@code false} for reverse iteration.
+ * @param toInclusive Whether the upper boundary of the
range is inclusive.
+ */
+ RocksDBDualCFRangeIterator(final Bytes from,
+ final Bytes to,
+ final RocksIterator noTimestampIterator,
+ final RocksIterator withTimestampIterator,
+ final String storeName,
+ final boolean forward,
+ final boolean toInclusive) {
this.forward = forward;
+ this.noTimestampIterator = noTimestampIterator;
+ this.storeName = storeName;
+ this.toInclusive = toInclusive;
+ this.withTimestampIterator = withTimestampIterator;
+
+ this.rawLastKey = initializeIterators(from, to);
}
+ /**
+ * Retrieves the next key-value pair in the range.
+ *
+ * <p>This method determines the next key-value pair to return by
merging the results from the two column
+ * families. If both column families have keys, it selects the one
that matches the iteration order and range
+ * conditions. Keys outside the specified range are skipped.</p>
+ *
+ * @return The next {@link KeyValue} pair in the range, or {@code
null} if no more elements are available.
+ */
@Override
- public synchronized boolean hasNext() {
- if (!open) {
- throw new InvalidStateStoreException(String.format("RocksDB
iterator for store %s has closed", storeName));
- }
- return super.hasNext();
+ protected KeyValue<Bytes, byte[]> makeNext() {
+ loadNextKeys();
+ if (noTimestampNext == null && withTimestampNext == null) return
allDone();
+ final KeyValue<Bytes, byte[]> next = fetchNextKeyValue();
+ return isInRange(next) ? next : allDone();
+ }
+
+ /**
+ * Returns the next key in the range without advancing the iterator.
+ *
+ * <p>This method retrieves the key of the next {@link KeyValue} pair
that would be returned by {@link #next()},
+ * without moving the iterator forward. This is useful for inspecting
the next key without affecting the
+ * iterator's state.</p>
+ *
+ * @return The next key as a {@link Bytes} object.
+ *
+ * @throws NoSuchElementException If there are no more elements in the
iterator.
+ */
+ @Override
+ public Bytes peekNextKey() {
+ if (!hasNext()) throw new NoSuchElementException();
+ return super.peek().key;
}
+ /**
+ * Advances the iterator and returns the next key-value pair.
+ *
+ * @return The next {@link KeyValue} pair in the range.
+ *
+ * @throws InvalidStateStoreException If the iterator has been closed.
+ */
@Override
public synchronized KeyValue<Bytes, byte[]> next() {
+ ensureOpen();
return super.next();
}
+ /**
+ * Checks if there are more elements available in the range.
+ *
+ * @return {@code true} if the iterator has more elements; {@code
false} otherwise.
+ *
+ * @throws InvalidStateStoreException If the iterator has been closed.
+ */
@Override
- protected KeyValue<Bytes, byte[]> makeNext() {
- if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
- nextNoTimestamp = iterNoTimestamp.key();
- }
-
- if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
- nextWithTimestamp = iterWithTimestamp.key();
- }
-
- if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
- if (nextWithTimestamp == null && !iterWithTimestamp.isValid())
{
- return allDone();
- } else {
- next = KeyValue.pair(new Bytes(nextWithTimestamp),
iterWithTimestamp.value());
- nextWithTimestamp = null;
- if (forward) {
- iterWithTimestamp.next();
- } else {
- iterWithTimestamp.prev();
- }
- }
- } else {
- if (nextWithTimestamp == null) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp),
convertToTimestampedFormat(iterNoTimestamp.value()));
- nextNoTimestamp = null;
- if (forward) {
- iterNoTimestamp.next();
- } else {
- iterNoTimestamp.prev();
- }
- } else {
- if (forward) {
- if (comparator.compare(nextNoTimestamp,
nextWithTimestamp) <= 0) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp),
convertToTimestampedFormat(iterNoTimestamp.value()));
- nextNoTimestamp = null;
- iterNoTimestamp.next();
- } else {
- next = KeyValue.pair(new Bytes(nextWithTimestamp),
iterWithTimestamp.value());
- nextWithTimestamp = null;
- iterWithTimestamp.next();
- }
- } else {
- if (comparator.compare(nextNoTimestamp,
nextWithTimestamp) >= 0) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp),
convertToTimestampedFormat(iterNoTimestamp.value()));
- nextNoTimestamp = null;
- iterNoTimestamp.prev();
- } else {
- next = KeyValue.pair(new Bytes(nextWithTimestamp),
iterWithTimestamp.value());
- nextWithTimestamp = null;
- iterWithTimestamp.prev();
- }
- }
- }
- }
- return next;
+ public synchronized boolean hasNext() {
+ ensureOpen();
+ return super.hasNext();
}
+ /**
+ * Closes the iterator and releases associated resources.
+ *
+ * <p>This method ensures that the RocksDB iterators for both column
families are properly closed. After this
+ * method is called, any subsequent calls to {@link #hasNext()},
{@link #next()}, or {@link #peekNextKey()} will
+ * result in an {@link InvalidStateStoreException}.</p>
+ *
+ * @throws IllegalStateException If the close callback has not been
set before calling this method.
+ */
@Override
public synchronized void close() {
if (closeCallback == null) {
- throw new IllegalStateException("RocksDBDualCFIterator expects
close callback to be set immediately upon creation");
+ final String message = "RocksDBDualCFIterator expects close
callback to be set immediately upon creation";
+ throw new IllegalStateException(message);
}
closeCallback.run();
- iterNoTimestamp.close();
- iterWithTimestamp.close();
+ noTimestampIterator.close();
+ withTimestampIterator.close();
open = false;
}
+ /**
+ * Registers a callback to be executed when the iterator is closed.
+ *
+ * @param closeCallback A {@link Runnable} to execute during the
{@link #close()} operation.
+ */
@Override
- public Bytes peekNextKey() {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ public void onClose(final Runnable closeCallback) {
+ this.closeCallback = closeCallback;
+ }
+
+ private KeyValue<Bytes, byte[]> compareAndHandleKeys() {
+ final int comparison = comparator.compare(noTimestampNext,
withTimestampNext);
+ if (forward ? comparison <= 0 : comparison >= 0) {
+ return handleNoTimestampOnly();
+ } else {
+ return handleWithTimestampOnly();
}
- return next.key;
}
- @Override
- public void onClose(final Runnable closeCallback) {
- this.closeCallback = closeCallback;
+ /**
+ * Determines the next key-value pair to return.
+ *
+ * <p>If one of the column family iterators is exhausted, the method
returns the result from the other iterator.
+ * If both iterators have keys, the method compares the keys and
returns the appropriate result based on the
+ * iteration direction.</p>
+ *
+ * @return The next {@link KeyValue} pair to return.
+ */
+ private KeyValue<Bytes, byte[]> fetchNextKeyValue() {
+ if (noTimestampNext == null) {
+ return handleWithTimestampOnly();
+ } else if (withTimestampNext == null) {
+ return handleNoTimestampOnly();
+ } else {
+ return compareAndHandleKeys();
+ }
}
- }
- private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator {
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
- private final byte[] rawLastKey;
- private final boolean forward;
- private final boolean toInclusive;
+ private KeyValue<Bytes, byte[]> handleNoTimestampOnly() {
+ final KeyValue<Bytes, byte[]> result = KeyValue.pair(new
Bytes(noTimestampNext),
convertToTimestampedFormat(noTimestampIterator.value()));
+ moveIterator(noTimestampIterator);
+ noTimestampNext = null;
+ return result;
+ }
- RocksDBDualCFRangeIterator(final String storeName,
- final RocksIterator iterWithTimestamp,
- final RocksIterator iterNoTimestamp,
- final Bytes from,
- final Bytes to,
- final boolean forward,
- final boolean toInclusive) {
- super(storeName, iterWithTimestamp, iterNoTimestamp, forward);
- this.forward = forward;
- this.toInclusive = toInclusive;
+ private KeyValue<Bytes, byte[]> handleWithTimestampOnly() {
+ final KeyValue<Bytes, byte[]> result = KeyValue.pair(new
Bytes(withTimestampNext), withTimestampIterator.value());
+ moveIterator(withTimestampIterator);
+ withTimestampNext = null;
+ return result;
+ }
+
+ /**
+ * Checks if the given key-value pair is within the specified range.
+ *
+ * <p>The method compares the key against the range's upper boundary
({@code rawLastKey}) and determines if it
+ * falls within the range.</p>
+ *
+ * @param keyValue The key-value pair to check.
+ *
+ * @return {@code true} if the key is within the range; {@code false}
otherwise.
+ */
+ private boolean isInRange(final KeyValue<Bytes, byte[]> keyValue) {
+ if (rawLastKey == null) return true; // Open-ended range
+ final int comparison = comparator.compare(keyValue.key.get(),
rawLastKey);
+ return forward
+ ? comparison < 0 || (toInclusive && comparison == 0)
+ : comparison > 0 || (toInclusive && comparison == 0);
Review Comment:
```suggestion
return (toInclusive && comparison == 0) || (forward ? comparison
< 0 : comparison > 0);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]