vcrfxia commented on code in PR #13142: URL: https://github.com/apache/kafka/pull/13142#discussion_r1085935187
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ########## @@ -383,7 +383,9 @@ public KeyValue<Bytes, byte[]> makeNext() { @Override public synchronized void close() { - openIterators.remove(this); + if (closeCallback != null) { Review Comment: Unclear whether we want to require that a closeCallback is always registered in general, but it is true that for these two specific classes (RocksDbIterator and RocksDBDualCFIterator), we do want to require that a closeCallback is always set. I've updated the code to reflect this. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -351,13 +360,23 @@ public <R> QueryResult<R> query( @Override public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { + if (userManagedIterators) { + throw new IllegalStateException("Must specify openIterators in call to prefixScan()"); + } + return prefixScan(prefix, prefixKeySerializer, openIterators); + } + + <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, + final PS prefixKeySerializer, + final Set<KeyValueIterator<Bytes, byte[]>> openIterators) { Review Comment: No. I've just added this additional validation, which required refactoring these calls so that the two versions of the method do not call each other. Instead they each call a third (helper) method. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -114,6 +114,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS private boolean userSpecifiedStatistics = false; private final RocksDBMetricsRecorder metricsRecorder; + private final boolean userManagedIterators; Review Comment: Good point. I think `selfManagedIterators` is also confusing though because it's unclear whether "self" means the store itself or the caller themselves. I've updated this to `autoManagedIterators` which means the opposite of what I initially had (for `userManagedIterators`) and added a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org