liming30 commented on code in PR #833:
URL: https://github.com/apache/incubator-paimon/pull/833#discussion_r1161733793
##########
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java:
##########
@@ -40,166 +38,90 @@
*/
public class SortMergeReader<T> implements RecordReader<T> {
- private final List<RecordReader<KeyValue>> nextBatchReaders;
private final Comparator<InternalRow> userKeyComparator;
private final MergeFunctionWrapper<T> mergeFunctionWrapper;
-
- private final PriorityQueue<Element> minHeap;
- private final List<Element> polled;
+ private final LoserTree<KeyValue> loserTree;
public SortMergeReader(
List<RecordReader<KeyValue>> readers,
Comparator<InternalRow> userKeyComparator,
MergeFunctionWrapper<T> mergeFunctionWrapper) {
- this.nextBatchReaders = new ArrayList<>(readers);
this.userKeyComparator = userKeyComparator;
this.mergeFunctionWrapper = mergeFunctionWrapper;
- this.minHeap =
- new PriorityQueue<>(
+ this.loserTree =
+ new LoserTree<>(
+ readers,
+ (e1, e2) -> {
+ if (e1 == null) {
+ return -1;
+ } else {
+ return e2 == null
+ ? 1
+ : userKeyComparator.compare(e2.key(),
e1.key());
+ }
+ },
(e1, e2) -> {
- int result =
userKeyComparator.compare(e1.kv.key(), e2.kv.key());
- if (result != 0) {
- return result;
+ if (e1 == null) {
+ return -1;
+ } else {
+ return e2 == null
+ ? 1
+ : Long.compare(e2.sequenceNumber(),
e1.sequenceNumber());
}
- return Long.compare(e1.kv.sequenceNumber(),
e2.kv.sequenceNumber());
});
- this.polled = new ArrayList<>();
}
@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
- for (RecordReader<KeyValue> reader : nextBatchReaders) {
- while (true) {
- RecordIterator<KeyValue> iterator = reader.readBatch();
- if (iterator == null) {
- // no more batches, permanently remove this reader
- reader.close();
- break;
- }
- KeyValue kv = iterator.next();
- if (kv == null) {
- // empty iterator, clean up and try next batch
- iterator.releaseBatch();
- } else {
- // found next kv
- minHeap.offer(new Element(kv, iterator, reader));
- break;
- }
- }
- }
- nextBatchReaders.clear();
-
- return minHeap.isEmpty() ? null : new SortMergeIterator();
+ loserTree.initializeIfNeeded();
+ return loserTree.peekWinner() == null ? null : new SortMergeIterator();
}
@Override
public void close() throws IOException {
- for (RecordReader<KeyValue> reader : nextBatchReaders) {
- reader.close();
- }
- for (Element element : minHeap) {
- element.iterator.releaseBatch();
- element.reader.close();
- }
- for (Element element : polled) {
- element.iterator.releaseBatch();
- element.reader.close();
- }
+ loserTree.close();
}
/** The iterator iterates on {@link SortMergeReader}. */
private class SortMergeIterator implements RecordIterator<T> {
private boolean released = false;
+ @Nullable
@Override
public T next() throws IOException {
while (true) {
- boolean hasMore = nextImpl();
- if (!hasMore) {
+ loserTree.adjustForNextLoop();
+ KeyValue winner = loserTree.popWinner();
+ if (winner == null) {
return null;
}
- T result = mergeFunctionWrapper.getResult();
+ mergeFunctionWrapper.reset();
+ mergeFunctionWrapper.add(winner);
+
+ T result = merge(winner);
if (result != null) {
return result;
}
}
}
- private boolean nextImpl() throws IOException {
+ private T merge(KeyValue winner) {
Preconditions.checkState(
- !released, "SortMergeIterator#advanceNext is called after
release");
- Preconditions.checkState(
- nextBatchReaders.isEmpty(),
- "SortMergeIterator#advanceNext is called even if the last
call returns null. "
- + "This is a bug.");
-
- // add previously polled elements back to priority queue
- for (Element element : polled) {
- if (element.update()) {
- // still kvs left, add back to priority queue
- minHeap.offer(element);
- } else {
- // reach end of batch, clean up
- element.iterator.releaseBatch();
- nextBatchReaders.add(element.reader);
- }
- }
- polled.clear();
+ !released, "SortMergeIterator#nextImpl is called after
release");
- // there are readers reaching end of batch, so we end current batch
- if (!nextBatchReaders.isEmpty()) {
- return false;
+ while (loserTree.peekWinner() != null
+ && userKeyComparator.compare(winner.key(),
loserTree.peekWinner().key()) == 0) {
Review Comment:
Yes, you are right, I did one more comparison. It will be fixed in the next
commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]