AMashenkov commented on code in PR #1511: URL: https://github.com/apache/ignite-3/pull/1511#discussion_r1066718943
########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/OrderedMergePublisher.java: ########## @@ -36,49 +35,52 @@ * * <p>Merges multiple concurrent ordered data streams into one. */ -public class SortingCompositePublisher<T> extends CompositePublisher<T> { +public class OrderedMergePublisher<T> implements Publisher<T> { /** Rows comparator. */ - private final Comparator<T> comp; + private final Comparator<? super T> comp; + + /** Array of upstream publishers. */ + private final Publisher<? extends T>[] sources; /** Prefetch size. */ private final int prefetch; /** * Constructor. * - * @param publishers List of upstream publishers. * @param comp Rows comparator. * @param prefetch Prefetch size. + * @param sources List of upstream publishers. */ - public SortingCompositePublisher(Collection<? extends Publisher<T>> publishers, Comparator<T> comp, int prefetch) { - super(publishers); - - this.comp = comp; + public OrderedMergePublisher( + Comparator<? super T> comp, + int prefetch, + Publisher<? extends T>... sources) { + this.sources = sources; this.prefetch = prefetch; + this.comp = comp; } + /** {@inheritDoc} */ @Override public void subscribe(Subscriber<? super T> downstream) { - subscribe(new OrderedMergeCompositeSubscription<>(downstream, comp, prefetch, publishers.size()), downstream); + OrderedMergeSubscription<? super T> subscription = new OrderedMergeSubscription<>(downstream, comp, prefetch, sources.length); + + subscription.subscribe(sources); + downstream.onSubscribe(subscription); + subscription.drain(); } /** * Sorting composite subscription. * * <p>Merges multiple concurrent ordered data streams into one. */ - public static class OrderedMergeCompositeSubscription<T> extends CompositePublisher.CompositeSubscription<T> { - /** Marker to indicate completed subscription. */ + static final class OrderedMergeSubscription<T> implements Subscription { Review Comment: ```suggestion static final class MergedSubscription<T> implements Subscription { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org