aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412048452


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    protected ListIterator<VersionedRecord<byte[]>> iterator;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner;
+    private Snapshot snapshot;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+        this.snapshot = null;
+        this.snapshotOwner = null;
+    }
+
+    @Override
+    public void close() {
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext() || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return iterator.next();
+        }
+        return null;
+    }
+    private boolean maybeFillIterator() {
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+        while (segmentIterator.hasNext()) {
+            final LogicalKeyValueSegment segment = segmentIterator.next();
+
+            if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+                this.snapshotOwner = segment;
+                // take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
+                final Lock lock = new ReentrantLock();

Review Comment:
   > because we can get a single snapshot when the iterator is creates
   
   You mean I get the snapshot in `RocksDBVersionedStore` and pass it to the 
`LogicalSegmentIterator`? This way, releasing the snapshot is problematic. How 
to release it? An object of `LogicalKeyValueSegment` must release it. That's 
why I introduced the `snapshotOwner` field.  
   
   > , but use the same snaphot throughout the lifetime of the iterator?
   
   At the moment, I am using the same snapshot throughout the whole lifetime of 
the iterator. As I mentioned in the code comments, taking the snapshot is done 
just once, and it will be released only with explicit closing of the iterator 
or when the iterator is empty.
   
   > all of them have the same physical RocksDB instance under the hood?/
   
   That's true. That's why a random segment, which can be the 
`latestValueStore` or the oldest segment (when order is ascending), creates and 
releases the single snapshot for being used through the whole lifetime of the 
iterator.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to