vcrfxia commented on code in PR #13142:
URL: https://github.com/apache/kafka/pull/13142#discussion_r1085935187


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -383,7 +383,9 @@ public KeyValue<Bytes, byte[]> makeNext() {
 
         @Override
         public synchronized void close() {
-            openIterators.remove(this);
+            if (closeCallback != null) {

Review Comment:
   Unclear whether we want to require that a closeCallback is always registered 
in general, but it is true that for these two specific classes (RocksDbIterator 
and RocksDBDualCFIterator), we do want to require that a closeCallback is 
always set. I've updated the code to reflect this.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -351,13 +360,23 @@ public <R> QueryResult<R> query(
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
                                                                                
     final PS prefixKeySerializer) {
+        if (userManagedIterators) {
+            throw new IllegalStateException("Must specify openIterators in 
call to prefixScan()");
+        }
+        return prefixScan(prefix, prefixKeySerializer, openIterators);
+    }
+
+    <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                             
final PS prefixKeySerializer,
+                                                                             
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {

Review Comment:
   No. I've just added this additional validation, which required refactoring 
these calls so that the two versions of the method do not call each other. 
Instead they each call a third (helper) method.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -114,6 +114,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     private boolean userSpecifiedStatistics = false;
 
     private final RocksDBMetricsRecorder metricsRecorder;
+    private final boolean userManagedIterators;

Review Comment:
   Good point. I think `selfManagedIterators` is also confusing though because 
it's unclear whether "self" means the store itself or the caller themselves.
   
   I've updated this to `autoManagedIterators` which means the opposite of what 
I initially had (for `userManagedIterators`) and added a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to