liming30 commented on code in PR #833: URL: https://github.com/apache/incubator-paimon/pull/833#discussion_r1168265315
########## paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact; + +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.ExceptionUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * A variant of the loser tree. In the LSM-Tree architecture, there will be duplicate Keys in + * multiple {@link RecordReader}, and these Keys need to be merged. In the loser tree, we return in + * the order of the Keys, but because the returned objects may be reused in the {@link RecordReader} + * or the {@link MergeFunction}, for a single {@link RecordReader}, we cannot get the next Key + * immediately after returning a Key, and we need to wait until the same Key in all {@link + * RecordReader} is returned before proceeding to the next Key. + * + * <p>The process of building the loser tree is the same as a regular loser tree. The difference is + * that in the process of adjusting the tree, we need to record the index of the same key and the + * state of the winner/loser for subsequent quick adjustment of the position of the winner. + * + * <p>Detailed design can refer to + * https://docs.google.com/document/d/1OPyb9o9K226Fb4O-SzZpBYgwfsvNY7X0H5T3ivdu1ck/edit?usp=sharing + */ +public class LoserTree<T> implements Closeable { + private final int[] tree; + private final int size; + private final List<LeafIterator<T>> leaves; + private final Comparator<T> firstComparator; + private final Comparator<T> secondComparator; + + private boolean initialized; + + public LoserTree( + List<RecordReader<T>> nextBatchReaders, + Comparator<T> firstComparator, + Comparator<T> secondComparator) { + this.size = nextBatchReaders.size(); + this.leaves = new ArrayList<>(size); + this.tree = new int[size]; + this.firstComparator = + (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 : firstComparator.compare(e1, e2)); + this.secondComparator = + (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 : secondComparator.compare(e1, e2)); + this.initialized = false; + + for (RecordReader<T> reader : nextBatchReaders) { + LeafIterator<T> iterator = new LeafIterator<>(reader); + this.leaves.add(iterator); + } + } + + /** Initialize the loser tree in the same way as the regular loser tree. */ + public void initializeIfNeeded() throws IOException { + if (!initialized) { + Arrays.fill(tree, -1); + for (int i = size - 1; i >= 0; i--) { + leaves.get(i).advanceIfAvailable(); + adjust(i); + } + initialized = true; + } + } + + /** Adjust the Key that needs to be returned in the next round. */ + public void adjustForNextLoop() throws IOException { + LeafIterator<T> winner = leaves.get(tree[0]); + while (winner.state == State.WINNER_POPPED) { + winner.advanceIfAvailable(); + adjust(tree[0]); + winner = leaves.get(tree[0]); + } + } + + /** Pop the current winner and update its state to {@link State#WINNER_POPPED}. */ + public T popWinner() { + LeafIterator<T> winner = leaves.get(tree[0]); + if (winner.state == State.WINNER_POPPED) { + // if the winner has already been popped, it means that all the same key has been + // processed. + return null; + } + T result = winner.pop(); + adjust(tree[0]); + return result; + } + + /** Peek the current winner, mainly for key comparisons. */ + public T peekWinner() { + return leaves.get(tree[0]).state != State.WINNER_POPPED ? leaves.get(tree[0]).peek() : null; + } + + /** + * Adjust the winner from bottom to top. Using different {@link State}, we can quickly compare + * whether all the current same keys have been processed. + */ + private void adjust(int winner) { + for (int parent = (winner + this.size) / 2; parent > 0 && winner >= 0; parent /= 2) { + LeafIterator<T> winnerNode = leaves.get(winner); + LeafIterator<T> parentNode; + if (this.tree[parent] == -1) { + // initialize the tree. + winnerNode.state = State.LOSER_WITH_NEW_KEY; + } else { + parentNode = leaves.get(this.tree[parent]); + switch (winnerNode.state) { + case WINNER_WITH_NEW_KEY: + adjustWithNewWinnerKey(parent, parentNode, winnerNode); + break; + case WINNER_WITH_SAME_KEY: + adjustWithSameWinnerKey(parent, parentNode, winnerNode); + break; + case WINNER_POPPED: + if (winnerNode.firstSameKeyIndex < 0) { + // fast path, which means that the same key is not yet processed in the + // current + // tree. + parent = -1; + } else { + // fast path. Directly exchange positions with the same key that has not + // yet been + // processed, no need to compare level by level. + parent = winnerNode.firstSameKeyIndex; + parentNode = leaves.get(this.tree[parent]); + winnerNode.state = State.LOSER_POPPED; + parentNode.state = State.WINNER_WITH_SAME_KEY; + } + break; + default: + throw new UnsupportedOperationException( + "unknown state for " + winnerNode.state.name()); + } + } + + // if the winner loses, exchange nodes. + if (!winnerNode.state.isWinner()) { + int tmp = winner; + winner = this.tree[parent]; + this.tree[parent] = tmp; + } + } + this.tree[0] = winner; + } + + /** The winner node has the same userKey as the global winner. */ + private void adjustWithSameWinnerKey( + int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode) { + switch (parentNode.state) { + case LOSER_WITH_SAME_KEY: + // the key of the previous loser is the same as the key of the current winner, + // only the sequence needs to be compared. + T parentKey = parentNode.peek(); + T childKey = winnerNode.peek(); + int secondResult = secondComparator.compare(parentKey, childKey); + if (secondResult > 0) { + parentNode.state = State.WINNER_WITH_SAME_KEY; + winnerNode.state = State.LOSER_WITH_SAME_KEY; + parentNode.setFirstSameKeyIndex(index); + } else { + winnerNode.setFirstSameKeyIndex(index); + } + return; + case LOSER_WITH_NEW_KEY: + case LOSER_POPPED: + return; + default: + throw new UnsupportedOperationException( + "unknown state for " + parentNode.state.name()); + } + } + + /** + * The userKey of the new local winner node is different from that of the previous global + * winner. + */ + private void adjustWithNewWinnerKey( + int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode) { + switch (parentNode.state) { + case LOSER_WITH_NEW_KEY: + // when the new winner is also a new key, it needs to be compared. Review Comment: When the userKey of the two compared nodes is different, the state of the loser node will transition to LOSER_WITH_NEW_KEY. ########## paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact; + +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.ExceptionUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * A variant of the loser tree. In the LSM-Tree architecture, there will be duplicate Keys in + * multiple {@link RecordReader}, and these Keys need to be merged. In the loser tree, we return in + * the order of the Keys, but because the returned objects may be reused in the {@link RecordReader} + * or the {@link MergeFunction}, for a single {@link RecordReader}, we cannot get the next Key + * immediately after returning a Key, and we need to wait until the same Key in all {@link + * RecordReader} is returned before proceeding to the next Key. + * + * <p>The process of building the loser tree is the same as a regular loser tree. The difference is + * that in the process of adjusting the tree, we need to record the index of the same key and the + * state of the winner/loser for subsequent quick adjustment of the position of the winner. + * + * <p>Detailed design can refer to + * https://docs.google.com/document/d/1OPyb9o9K226Fb4O-SzZpBYgwfsvNY7X0H5T3ivdu1ck/edit?usp=sharing + */ +public class LoserTree<T> implements Closeable { + private final int[] tree; + private final int size; + private final List<LeafIterator<T>> leaves; + private final Comparator<T> firstComparator; + private final Comparator<T> secondComparator; + + private boolean initialized; + + public LoserTree( + List<RecordReader<T>> nextBatchReaders, + Comparator<T> firstComparator, + Comparator<T> secondComparator) { + this.size = nextBatchReaders.size(); + this.leaves = new ArrayList<>(size); + this.tree = new int[size]; + this.firstComparator = + (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 : firstComparator.compare(e1, e2)); + this.secondComparator = + (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 : secondComparator.compare(e1, e2)); + this.initialized = false; + + for (RecordReader<T> reader : nextBatchReaders) { + LeafIterator<T> iterator = new LeafIterator<>(reader); + this.leaves.add(iterator); + } + } + + /** Initialize the loser tree in the same way as the regular loser tree. */ + public void initializeIfNeeded() throws IOException { + if (!initialized) { + Arrays.fill(tree, -1); + for (int i = size - 1; i >= 0; i--) { + leaves.get(i).advanceIfAvailable(); + adjust(i); + } + initialized = true; + } + } + + /** Adjust the Key that needs to be returned in the next round. */ + public void adjustForNextLoop() throws IOException { + LeafIterator<T> winner = leaves.get(tree[0]); + while (winner.state == State.WINNER_POPPED) { + winner.advanceIfAvailable(); + adjust(tree[0]); + winner = leaves.get(tree[0]); + } + } + + /** Pop the current winner and update its state to {@link State#WINNER_POPPED}. */ + public T popWinner() { + LeafIterator<T> winner = leaves.get(tree[0]); + if (winner.state == State.WINNER_POPPED) { + // if the winner has already been popped, it means that all the same key has been + // processed. + return null; + } + T result = winner.pop(); + adjust(tree[0]); + return result; + } + + /** Peek the current winner, mainly for key comparisons. */ + public T peekWinner() { + return leaves.get(tree[0]).state != State.WINNER_POPPED ? leaves.get(tree[0]).peek() : null; + } + + /** + * Adjust the winner from bottom to top. Using different {@link State}, we can quickly compare + * whether all the current same keys have been processed. + */ + private void adjust(int winner) { + for (int parent = (winner + this.size) / 2; parent > 0 && winner >= 0; parent /= 2) { + LeafIterator<T> winnerNode = leaves.get(winner); + LeafIterator<T> parentNode; + if (this.tree[parent] == -1) { + // initialize the tree. + winnerNode.state = State.LOSER_WITH_NEW_KEY; + } else { + parentNode = leaves.get(this.tree[parent]); + switch (winnerNode.state) { + case WINNER_WITH_NEW_KEY: + adjustWithNewWinnerKey(parent, parentNode, winnerNode); + break; + case WINNER_WITH_SAME_KEY: + adjustWithSameWinnerKey(parent, parentNode, winnerNode); + break; + case WINNER_POPPED: + if (winnerNode.firstSameKeyIndex < 0) { + // fast path, which means that the same key is not yet processed in the + // current + // tree. + parent = -1; + } else { + // fast path. Directly exchange positions with the same key that has not + // yet been + // processed, no need to compare level by level. + parent = winnerNode.firstSameKeyIndex; + parentNode = leaves.get(this.tree[parent]); + winnerNode.state = State.LOSER_POPPED; + parentNode.state = State.WINNER_WITH_SAME_KEY; + } + break; + default: + throw new UnsupportedOperationException( + "unknown state for " + winnerNode.state.name()); + } + } + + // if the winner loses, exchange nodes. + if (!winnerNode.state.isWinner()) { + int tmp = winner; + winner = this.tree[parent]; + this.tree[parent] = tmp; + } + } + this.tree[0] = winner; + } + + /** The winner node has the same userKey as the global winner. */ + private void adjustWithSameWinnerKey( + int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode) { + switch (parentNode.state) { + case LOSER_WITH_SAME_KEY: + // the key of the previous loser is the same as the key of the current winner, + // only the sequence needs to be compared. + T parentKey = parentNode.peek(); + T childKey = winnerNode.peek(); + int secondResult = secondComparator.compare(parentKey, childKey); + if (secondResult > 0) { + parentNode.state = State.WINNER_WITH_SAME_KEY; + winnerNode.state = State.LOSER_WITH_SAME_KEY; + parentNode.setFirstSameKeyIndex(index); + } else { + winnerNode.setFirstSameKeyIndex(index); + } + return; + case LOSER_WITH_NEW_KEY: + case LOSER_POPPED: + return; + default: + throw new UnsupportedOperationException( + "unknown state for " + parentNode.state.name()); + } + } + + /** + * The userKey of the new local winner node is different from that of the previous global + * winner. + */ + private void adjustWithNewWinnerKey( + int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode) { + switch (parentNode.state) { + case LOSER_WITH_NEW_KEY: + // when the new winner is also a new key, it needs to be compared. Review Comment: When the userKey of the two compared nodes is different, the state of the loser node will transition to `LOSER_WITH_NEW_KEY`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org