smengcl commented on code in PR #8912:
URL: https://github.com/apache/ozone/pull/8912#discussion_r2666652611
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -233,69 +236,136 @@ public String next() {
}
}
- private abstract static class MultipleSstFileIterator<T> implements
ClosableIterator<T> {
+ /**
+ * A wrapper class that holds an iterator and its current value for heap
operations.
+ */
+ private static class HeapEntry<T extends Comparable<T>>
+ implements Comparable<HeapEntry<T>>, Closeable {
+ private final ClosableIterator<T> iterator;
+ private T currentKey;
+
+ HeapEntry(ClosableIterator<T> iterator) {
+ this.iterator = iterator;
+ advance();
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
- private final Iterator<String> fileNameIterator;
+ boolean advance() {
+ if (iterator.hasNext()) {
+ currentKey = iterator.next();
+ return true;
+ } else {
+ currentKey = null;
+ return false;
+ }
+ }
+
+ T getCurrentKey() {
+ return currentKey;
+ }
- private String currentFile;
- private ClosableIterator<T> currentFileIterator;
+ @Override
+ public int compareTo(@Nonnull HeapEntry<T> other) {
+ return Comparator.comparing(HeapEntry<T>::getCurrentKey).compare(this,
other);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ HeapEntry<T> other = (HeapEntry<T>) obj;
+ return this.compareTo(other) == 0;
+ }
- private MultipleSstFileIterator(Collection<String> files) {
- this.fileNameIterator = files.iterator();
+ @Override
+ public int hashCode() {
+ return currentKey.hashCode();
+ }
+ }
+
+ /**
+ * The MultipleSstFileIterator class is an abstract base for iterating over
multiple SST files.
+ * It uses a PriorityQueue to merge keys from all files in sorted order.
+ * Each file's iterator is wrapped in a HeapEntryWithFileIdx object,
+ * which ensures stable ordering for identical keys by considering the file
index.
+ * @param <T>
+ */
+ private abstract static class MultipleSstFileIterator<T extends
Comparable<T>> implements ClosableIterator<T> {
+ private final PriorityQueue<HeapEntry<T>> minHeap;
+
+ private MultipleSstFileIterator(Collection<String> sstFiles) {
+ this.minHeap = new PriorityQueue<>();
init();
+ initMinHeap(sstFiles);
}
protected abstract void init();
protected abstract ClosableIterator<T> getKeyIteratorForFile(String file)
throws RocksDBException, IOException;
- @Override
- public boolean hasNext() {
+ private void initMinHeap(Collection<String> files) {
try {
- do {
- if (Objects.nonNull(currentFileIterator) &&
currentFileIterator.hasNext()) {
- return true;
+ for (String file : files) {
+ ClosableIterator<T> iterator = getKeyIteratorForFile(file);
+ HeapEntry<T> entry = new HeapEntry<>(iterator);
+
+ if (entry.getCurrentKey() != null) {
+ minHeap.offer(entry);
+ } else {
+ // No valid entries, close the iterator
+ entry.close();
}
- } while (moveToNextFile());
+ }
} catch (IOException | RocksDBException e) {
- // TODO: [Snapshot] This exception has to be handled by the caller.
- // We have to do better exception handling.
- throw new RuntimeException(e);
+ // Clean up any opened iterators
+ close();
+ throw new RuntimeException("Failed to initialize SST file iterators",
e);
}
- return false;
}
@Override
- public T next() {
- if (hasNext()) {
- return currentFileIterator.next();
- }
- throw new NoSuchElementException("No more elements found.");
+ public boolean hasNext() {
+ return !minHeap.isEmpty();
}
@Override
- public void close() throws UncheckedIOException {
- try {
- closeCurrentFile();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements found.");
}
- }
- private boolean moveToNextFile() throws IOException, RocksDBException {
- if (fileNameIterator.hasNext()) {
- closeCurrentFile();
- currentFile = fileNameIterator.next();
- this.currentFileIterator = getKeyIteratorForFile(currentFile);
- return true;
+ assert minHeap.peek() != null;
+ // Get current key from heap
+ T currentKey = minHeap.peek().getCurrentKey();
+
+ // Advance all entries with the same key (from different files)
+ while (!minHeap.isEmpty() &&
Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
Review Comment:
Thanks @swamirishi for fixing that in
https://github.com/apache/ozone/pull/9576
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]