AMashenkov commented on code in PR #1511:
URL: https://github.com/apache/ignite-3/pull/1511#discussion_r1072138375


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/OrderedMergePublisher.java:
##########
@@ -36,49 +35,52 @@
  *
  * <p>Merges multiple concurrent ordered data streams into one.
  */
-public class SortingCompositePublisher<T> extends CompositePublisher<T> {
+public class OrderedMergePublisher<T> implements Publisher<T> {
     /** Rows comparator. */
-    private final Comparator<T> comp;
+    private final Comparator<? super T> comp;
+
+    /** Array of upstream publishers. */
+    private final Publisher<? extends T>[] sources;
 
     /** Prefetch size. */
     private final int prefetch;
 
     /**
      * Constructor.
      *
-     * @param publishers List of upstream publishers.
      * @param comp Rows comparator.
      * @param prefetch Prefetch size.
+     * @param sources List of upstream publishers.
      */
-    public SortingCompositePublisher(Collection<? extends Publisher<T>> 
publishers, Comparator<T> comp, int prefetch) {
-        super(publishers);
-
-        this.comp = comp;
+    public OrderedMergePublisher(
+            Comparator<? super T> comp,
+            int prefetch,
+            Publisher<? extends T>... sources) {
+        this.sources = sources;
         this.prefetch = prefetch;
+        this.comp = comp;
     }
 
+    /** {@inheritDoc} */
     @Override
     public void subscribe(Subscriber<? super T> downstream) {
-        subscribe(new OrderedMergeCompositeSubscription<>(downstream, comp, 
prefetch, publishers.size()), downstream);
+        OrderedMergeSubscription<? super T> subscription = new 
OrderedMergeSubscription<>(downstream, comp, prefetch, sources.length);
+
+        subscription.subscribe(sources);
+        downstream.onSubscribe(subscription);
+        subscription.drain();
     }
 
     /**
      * Sorting composite subscription.
      *
      * <p>Merges multiple concurrent ordered data streams into one.
      */
-    public static class OrderedMergeCompositeSubscription<T> extends 
CompositePublisher.CompositeSubscription<T> {
-        /** Marker to indicate completed subscription. */
+    static final class OrderedMergeSubscription<T> implements Subscription {

Review Comment:
   Concatenated publisher is also ordered, but not sorted, as it iterates over 
array of upstreams.
   Another example, LinkedHashMap is ordered, but not sorted.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to