This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this 
push:
     new a7ddaf796f IGNITE-17813 Sql. Introduce sorted reducer for 
IndexScanNode (#1237)
a7ddaf796f is described below

commit a7ddaf796f8dda66b9b2425d229f12c033f81224
Author: Pavel Pereslegin <xxt...@gmail.com>
AuthorDate: Thu Nov 3 17:17:47 2022 +0400

    IGNITE-17813 Sql. Introduce sorted reducer for IndexScanNode (#1237)
---
 .../sql/engine/exec/LogicalRelImplementor.java     |   4 +-
 .../sql/engine/exec/rel/IndexScanNode.java         | 167 +++++----
 .../ignite/internal/sql/engine/util/Commons.java   |   7 +
 .../sql/engine/util/CompositePublisher.java        | 172 +++++++++
 .../sql/engine/util/SortingCompositePublisher.java | 398 +++++++++++++++++++++
 .../exec/rel/IndexScanNodeExecutionTest.java       | 102 ++++--
 .../sql/engine/util/CompositeSubscriptionTest.java | 373 +++++++++++++++++++
 7 files changed, 1118 insertions(+), 105 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 5d31e63046..eefa27a037 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -100,6 +100,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -304,8 +305,9 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
         IgniteIndex idx = tbl.getIndex(rel.indexName());
         ColocationGroup group = ctx.group(rel.sourceId());
         int[] parts = group.partitions(ctx.localNode().id());
+        Comparator<RowT> comp = idx.type() == Type.SORTED ? 
ctx.expressionFactory().comparator(rel.collation()) : null;
 
-        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, ranges, 
filters, prj, requiredColumns.toBitSet());
+        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, comp, 
ranges, filters, prj, requiredColumns.toBitSet());
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index 90064f3c3a..00b52b7cc3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -19,11 +19,15 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 
+import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Function;
@@ -40,6 +44,9 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.CompositePublisher;
+import org.apache.ignite.internal.sql.engine.util.SortingCompositePublisher;
 import org.jetbrains.annotations.Contract;
 import org.jetbrains.annotations.Nullable;
 
@@ -70,9 +77,11 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
     /** Participating columns. */
     private final @Nullable BitSet requiredColumns;
 
-    private final RangeIterable<RowT> rangeConditions;
+    private final @Nullable RangeIterable<RowT> rangeConditions;
 
-    private Iterator<RangeCondition<RowT>> rangeConditionIterator;
+    private final @Nullable Comparator<RowT> comp;
+
+    private @Nullable Iterator<RangeCondition<RowT>> rangeConditionIterator;
 
     private int requested;
 
@@ -82,7 +91,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
 
     private Subscription activeSubscription;
 
-    private int curPartIdx;
+    private boolean rangeConditionsProcessed;
 
     /**
      * Constructor.
@@ -91,6 +100,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
      * @param rowType Output type of the current node.
      * @param schemaTable The table this node should scan.
      * @param parts Partition numbers to scan.
+     * @param comp Rows comparator.
      * @param rangeConditions Range conditions.
      * @param filters Optional filter to filter out rows.
      * @param rowTransformer Optional projection function.
@@ -102,6 +112,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> 
{
             IgniteIndex schemaIndex,
             InternalIgniteTable schemaTable,
             int[] parts,
+            @Nullable Comparator<RowT> comp,
             @Nullable RangeIterable<RowT> rangeConditions,
             @Nullable Predicate<RowT> filters,
             @Nullable Function<RowT, RowT> rowTransformer,
@@ -116,6 +127,9 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> 
{
         this.rowTransformer = rowTransformer;
         this.requiredColumns = requiredColumns;
         this.rangeConditions = rangeConditions;
+        this.comp = comp;
+
+        rangeConditionIterator = rangeConditions == null ? null : 
rangeConditions.iterator();
 
         factory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
@@ -153,8 +167,11 @@ public class IndexScanNode<RowT> extends 
AbstractNode<RowT> {
     protected void rewindInternal() {
         requested = 0;
         waiting = 0;
-        curPartIdx = 0;
-        rangeConditionIterator = null;
+        rangeConditionsProcessed = false;
+
+        if (rangeConditions != null) {
+            rangeConditionIterator = rangeConditions.iterator();
+        }
 
         if (activeSubscription != null) {
             activeSubscription.cancel();
@@ -233,84 +250,96 @@ public class IndexScanNode<RowT> extends 
AbstractNode<RowT> {
         Subscription subscription = this.activeSubscription;
         if (subscription != null) {
             subscription.request(waiting);
-        } else if (curPartIdx < parts.length) {
-            if (schemaIndex.type() == Type.SORTED) {
-                //TODO: https://issues.apache.org/jira/browse/IGNITE-17813
-                // Introduce new publisher using merge-sort algo to merge 
partition index publishers.
-                int part = curPartIdx;
-
-                int flags = 0;
-                BinaryTuple lowerBound = null;
-                BinaryTuple upperBound = null;
-
-                if (rangeConditions == null) {
-                    flags = SortedIndex.INCLUDE_LEFT | 
SortedIndex.INCLUDE_RIGHT;
-                    curPartIdx++;
-                } else {
-                    if (rangeConditionIterator == null) {
-                        rangeConditionIterator = rangeConditions.iterator();
-                    }
+        } else if (!rangeConditionsProcessed) {
+            RangeCondition<RowT> cond = null;
 
-                    RangeCondition<RowT> cond = rangeConditionIterator.next();
+            if (rangeConditionIterator == null || 
!rangeConditionIterator.hasNext()) {
+                rangeConditionsProcessed = true;
+            } else {
+                cond = rangeConditionIterator.next();
+
+                rangeConditionsProcessed = !rangeConditionIterator.hasNext();
+            }
 
-                    lowerBound = toBinaryTuplePrefix(cond.lower());
-                    upperBound = toBinaryTuplePrefix(cond.upper());
+            indexPublisher(parts, cond).subscribe(new SubscriberImpl());
+        } else {
+            waiting = NOT_WAITING;
+        }
+    }
 
-                    flags |= (cond.lowerInclude()) ? SortedIndex.INCLUDE_LEFT 
: 0;
-                    flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT 
: 0;
+    private Publisher<RowT> indexPublisher(int[] parts, @Nullable 
RangeCondition<RowT> cond) {
+        List<Flow.Publisher<RowT>> partPublishers = new 
ArrayList<>(parts.length);
 
-                    if (!rangeConditionIterator.hasNext()) { // Switch to next 
partition and reset range index.
-                        rangeConditionIterator = null;
-                        curPartIdx++;
-                    }
-                }
+        for (int p : parts) {
+            partPublishers.add(partitionPublisher(p, cond));
+        }
 
-                ((SortedIndex) schemaIndex.index()).scan(
-                        parts[part],
-                        context().transaction(),
-                        lowerBound,
-                        upperBound,
-                        flags,
-                        requiredColumns
-                ).subscribe(new SubscriberImpl());
-            } else {
-                assert schemaIndex.type() == Type.HASH;
+        return comp != null
+                ? new SortingCompositePublisher<>(partPublishers, comp, 
Commons.SORTED_IDX_PART_PREFETCH_SIZE)
+                : new CompositePublisher<>(partPublishers);
+    }
 
-                int part = curPartIdx;
-                BinaryTuple key = null;
+    private Flow.Publisher<RowT> partitionPublisher(int part, @Nullable 
RangeCondition<RowT> cond) {
+        Publisher<BinaryTuple> pub;
 
-                if (rangeConditions == null) {
-                    curPartIdx++;
-                } else {
-                    if (rangeConditionIterator == null) {
-                        rangeConditionIterator = rangeConditions.iterator();
-                    }
+        if (schemaIndex.type() == Type.SORTED) {
+            int flags = 0;
+            BinaryTuple lower = null;
+            BinaryTuple upper = null;
 
-                    RangeCondition<RowT> cond = rangeConditionIterator.next();
+            if (cond == null) {
+                flags = SortedIndex.INCLUDE_LEFT | SortedIndex.INCLUDE_RIGHT;
+            } else {
+                lower = toBinaryTuplePrefix(cond.lower());
+                upper = toBinaryTuplePrefix(cond.upper());
 
-                    assert cond.lower() == cond.upper();
+                flags |= (cond.lowerInclude()) ? SortedIndex.INCLUDE_LEFT : 0;
+                flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT : 0;
+            }
 
-                    key = toBinaryTuple(cond.lower());
+            pub = ((SortedIndex) schemaIndex.index()).scan(part, 
context().transaction(), lower, upper, flags, requiredColumns);
+        } else {
+            assert schemaIndex.type() == Type.HASH;
+            BinaryTuple key = null;
 
-                    if (!rangeConditionIterator.hasNext()) { // Switch to next 
partition and reset range index.
-                        rangeConditionIterator = null;
-                        curPartIdx++;
-                    }
-                }
+            if (cond != null) {
+                assert cond.lower() == cond.upper();
 
-                schemaIndex.index().scan(
-                        parts[part],
-                        context().transaction(),
-                        key,
-                        requiredColumns
-                ).subscribe(new SubscriberImpl());
+                key = toBinaryTuple(cond.lower());
             }
-        } else {
-            waiting = NOT_WAITING;
+
+            pub = schemaIndex.index().scan(part, context().transaction(), key, 
requiredColumns);
         }
+
+        return downstream -> {
+            // BinaryTuple -> RowT converter.
+            Subscriber<BinaryTuple> subs = new Subscriber<>() {
+                @Override
+                public void onSubscribe(Subscription subscription) {
+                    downstream.onSubscribe(subscription);
+                }
+
+                @Override
+                public void onNext(BinaryTuple item) {
+                    downstream.onNext(convert(item));
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    downstream.onError(throwable);
+                }
+
+                @Override
+                public void onComplete() {
+                    downstream.onComplete();
+                }
+            };
+
+            pub.subscribe(subs);
+        };
     }
 
-    private class SubscriberImpl implements Flow.Subscriber<BinaryTuple> {
+    private class SubscriberImpl implements Flow.Subscriber<RowT> {
 
         private int received = 0; // HB guarded here.
 
@@ -325,9 +354,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> 
{
 
         /** {@inheritDoc} */
         @Override
-        public void onNext(BinaryTuple binRow) {
-            RowT row = convert(binRow);
-
+        public void onNext(RowT row) {
             inBuff.add(row);
 
             if (++received == inBufSize) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 869e046e05..b4576d2efd 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -126,6 +126,13 @@ public final class Commons {
 
     public static final int IN_BUFFER_SIZE = 512;
 
+    /**
+     * The number of elements to be prefetched from each partition when 
scanning the sorted index.
+     * The higher the value, the fewer calls to the upstream will be, but at 
the same time, the bigger
+     * internal buffer will be.
+     */
+    public static final int SORTED_IDX_PART_PREFETCH_SIZE = 100;
+
     public static final SqlParser.Config PARSER_CONFIG = SqlParser.config()
             .withParserFactory(IgniteSqlParserImpl.FACTORY)
             .withLex(Lex.ORACLE)
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java
new file mode 100644
index 0000000000..6fb4614cfa
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Composite publisher.
+ */
+public class CompositePublisher<T> implements Publisher<T> {
+    /** List of upstream publishers. */
+    final Collection<? extends Publisher<T>> publishers;
+
+    /**
+     * Constructor.
+     *
+     * @param publishers List of upstream publishers.
+     */
+    public CompositePublisher(Collection<? extends Publisher<T>> publishers) {
+        this.publishers = publishers;
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> downstream) {
+        subscribe(new CompositeSubscription<>(downstream), downstream);
+    }
+
+    void subscribe(CompositeSubscription<T> subscription, Subscriber<? super 
T> downstream) {
+        subscription.subscribe(publishers);
+
+        downstream.onSubscribe(subscription);
+    }
+
+    /**
+     * Sequential composite subscription.
+     *
+     * <p>Sequentially receives data from each registered subscription
+     * until the total number of requested items has been received.
+     */
+    public static class CompositeSubscription<T> implements Subscription {
+        /** List of subscriptions. */
+        private final List<Subscription> subscriptions = new ArrayList<>();
+
+        /** Downstream subscriber. */
+        protected final Subscriber<? super T> downstream;
+
+        /** Current subscription index. */
+        private int subscriptionIdx = 0;
+
+        /** Total number of remaining items. */
+        private long remaining;
+
+        /** Flag indicating that the subscription has been cancelled. */
+        private boolean cancelled;
+
+        public CompositeSubscription(Subscriber<? super T> downstream) {
+            this.downstream = downstream;
+        }
+
+        /**
+         * Subscribe multiple publishers.
+         *
+         * @param sources Publishers.
+         */
+        public void subscribe(Collection<? extends Publisher<? extends T>> 
sources) {
+            for (Publisher<? extends T> publisher : sources) {
+                publisher.subscribe(new PlainSubscriber());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void request(long n) {
+            remaining = n;
+
+            requestInternal();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void cancel() {
+            cancelled = true;
+
+            Subscription subscription = activeSubscription();
+
+            if (subscription != null) {
+                subscription.cancel();
+            }
+        }
+
+        /** Request data from a subscription. */
+        private void requestInternal() {
+            Subscription subscription = activeSubscription();
+
+            if (subscription != null) {
+                subscription.request(remaining);
+            }
+        }
+
+        private @Nullable Subscription activeSubscription() {
+            if (subscriptionIdx >= subscriptions.size()) {
+                return null;
+            }
+
+            return subscriptions.get(subscriptionIdx);
+        }
+
+        /**
+         * Plain subscriber.
+         */
+        protected class PlainSubscriber implements Subscriber<T> {
+            /** {@inheritDoc} */
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscriptions.add(subscription);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onNext(T item) {
+                --remaining;
+
+                downstream.onNext(item);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onError(Throwable throwable) {
+                downstream.onError(throwable);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onComplete() {
+                if (cancelled) {
+                    return;
+                }
+
+                if (++subscriptionIdx == subscriptions.size()) {
+                    downstream.onComplete();
+
+                    return;
+                }
+
+                if (remaining > 0) {
+                    requestInternal();
+                }
+            }
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SortingCompositePublisher.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SortingCompositePublisher.java
new file mode 100644
index 0000000000..b5b4cac617
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SortingCompositePublisher.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodHandles.Lookup;
+import java.lang.invoke.VarHandle;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sorting composite publisher.
+ *
+ * <p>Merges multiple concurrent ordered data streams into one.
+ */
+public class SortingCompositePublisher<T> extends CompositePublisher<T> {
+    /** Rows comparator. */
+    private final Comparator<T> comp;
+
+    /** Prefetch size. */
+    private final int prefetch;
+
+    /**
+     * Constructor.
+     *
+     * @param publishers List of upstream publishers.
+     * @param comp Rows comparator.
+     * @param prefetch Prefetch size.
+     */
+    public SortingCompositePublisher(Collection<? extends Publisher<T>> 
publishers, Comparator<T> comp, int prefetch) {
+        super(publishers);
+
+        this.comp = comp;
+        this.prefetch = prefetch;
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> downstream) {
+        subscribe(new OrderedMergeCompositeSubscription<>(downstream, comp, 
prefetch, publishers.size()), downstream);
+    }
+
+    /**
+     * Sorting composite subscription.
+     *
+     * <p>Merges multiple concurrent ordered data streams into one.
+     */
+    public static class OrderedMergeCompositeSubscription<T> extends 
CompositePublisher.CompositeSubscription<T> {
+        /** Marker to indicate completed subscription. */
+        private static final Object DONE = new Object();
+
+        /** Atomic updater for {@link #error} field. */
+        private static final VarHandle ERROR;
+
+        /** Atomic updater for {@link #cancelled} field. */
+        private static final VarHandle CANCELLED;
+
+        /** Atomic updater for {@link #requested} field. */
+        private static final VarHandle REQUESTED;
+
+        /** Counter to prevent concurrent execution of a critical section. */
+        private final AtomicInteger guardCntr = new AtomicInteger();
+
+        /** Subscribers. */
+        private final OrderedMergeSubscriber<T>[] subscribers;
+
+        /** Rows comparator. */
+        private final Comparator<? super T> comp;
+
+        /** Last received values. */
+        private final Object[] values;
+
+        /** Error. */
+        @SuppressWarnings({"unused", "FieldMayBeFinal"})
+        private Throwable error;
+
+        /** Cancelled flag. */
+        @SuppressWarnings({"unused", "FieldMayBeFinal"})
+        private boolean cancelled;
+
+        /** Number of requested rows. */
+        @SuppressWarnings({"unused", "FieldMayBeFinal"})
+        private long requested;
+
+        /** Number of emitted rows (guarded by {@link #guardCntr}). */
+        private long emitted;
+
+        static {
+            Lookup lk = MethodHandles.lookup();
+
+            try {
+                ERROR = 
lk.findVarHandle(OrderedMergeCompositeSubscription.class, "error", 
Throwable.class);
+                CANCELLED = 
lk.findVarHandle(OrderedMergeCompositeSubscription.class, "cancelled", 
boolean.class);
+                REQUESTED = 
lk.findVarHandle(OrderedMergeCompositeSubscription.class, "requested", 
long.class);
+            } catch (NoSuchFieldException | IllegalAccessException ex) {
+                throw new InternalError(ex);
+            }
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param downstream Downstream subscriber.
+         * @param comp Rows comparator.
+         * @param prefetch Prefetch size.
+         * @param cnt Count of subscriptions.
+         */
+        public OrderedMergeCompositeSubscription(Subscriber<? super T> 
downstream, Comparator<? super T> comp, int prefetch, int cnt) {
+            super(downstream);
+
+            this.comp = comp;
+            this.subscribers = new OrderedMergeSubscriber[cnt];
+
+            for (int i = 0; i < cnt; i++) {
+                this.subscribers[i] = new OrderedMergeSubscriber<>(this, 
prefetch);
+            }
+
+            this.values = new Object[cnt];
+        }
+
+        @Override
+        public void subscribe(Collection<? extends Publisher<? extends T>> 
sources) {
+            int i = 0;
+
+            for (Publisher<? extends T> publisher : sources) {
+                publisher.subscribe(subscribers[i++]);
+            }
+        }
+
+        @Override
+        public void request(long n) {
+            for (;;) {
+                long current = (long) REQUESTED.getAcquire(this);
+                long next = current + n;
+
+                if (next < 0L) {
+                    next = Long.MAX_VALUE;
+                }
+
+                if (REQUESTED.compareAndSet(this, current, next)) {
+                    break;
+                }
+            }
+
+            drain();
+        }
+
+        @Override
+        public void cancel() {
+            if (CANCELLED.compareAndSet(this, false, true)) {
+                for (OrderedMergeSubscriber<T> inner : subscribers) {
+                    inner.cancel();
+                }
+
+                if (guardCntr.getAndIncrement() == 0) {
+                    Arrays.fill(values, null);
+
+                    for (OrderedMergeSubscriber<T> inner : subscribers) {
+                        inner.queue.clear();
+                    }
+                }
+            }
+        }
+
+        private void onInnerError(OrderedMergeSubscriber<T> sender, Throwable 
ex) {
+            updateError(ex);
+
+            sender.done = true;
+
+            drain();
+        }
+
+        private void updateError(Throwable throwable) {
+            for (;;) {
+                Throwable current = (Throwable) ERROR.getAcquire(this);
+                Throwable next;
+
+                if (current == null) {
+                    next = throwable;
+                } else {
+                    next = new Throwable();
+                    next.addSuppressed(current);
+                    next.addSuppressed(throwable);
+                }
+
+                if (ERROR.compareAndSet(this, current, next)) {
+                    break;
+                }
+            }
+        }
+
+        private void drain() {
+            // Only one thread can pass below.
+            if (guardCntr.getAndIncrement() != 0) {
+                return;
+            }
+
+            // Frequently accessed fields.
+            Subscriber<? super T> downstream = this.downstream;
+            OrderedMergeSubscriber<T>[] subscribers = this.subscribers;
+            int subsCnt = subscribers.length;
+            Object[] values = this.values;
+            long emitted = this.emitted;
+
+            for (;;) {
+                long requested = (long) REQUESTED.getAcquire(this);
+
+                for (;;) {
+                    if ((boolean) CANCELLED.getAcquire(this)) {
+                        Arrays.fill(values, null);
+
+                        for (OrderedMergeSubscriber<T> inner : subscribers) {
+                            inner.queue.clear();
+                        }
+
+                        return;
+                    }
+
+                    int completed = 0;
+                    boolean waitResponse = false;
+
+                    for (int i = 0; i < subsCnt; i++) {
+                        Object obj = values[i];
+
+                        if (obj == DONE) {
+                            completed++;
+                        } else if (obj == null) {
+                            boolean innerDone = subscribers[i].done;
+
+                            obj = subscribers[i].queue.poll();
+
+                            if (obj != null) {
+                                values[i] = obj;
+                            } else if (innerDone) {
+                                values[i] = DONE;
+
+                                completed++;
+                            } else {
+                                // Subscriber has not received a response yet.
+                                waitResponse = true;
+
+                                break;
+                            }
+                        }
+                    }
+
+                    if (completed == subsCnt) {
+                        Throwable ex = (Throwable) ERROR.getAcquire(this);
+
+                        if (ex == null) {
+                            downstream.onComplete();
+                        } else {
+                            downstream.onError(ex);
+                        }
+
+                        return;
+                    }
+
+                    if (waitResponse || emitted == requested) {
+                        break;
+                    }
+
+                    T min = null;
+                    int minIndex = -1;
+
+                    for (int i = 0; i < values.length; i++) {
+                        Object obj = values[i];
+
+                        if (obj != DONE && (min == null || comp.compare(min, 
(T) obj) > 0)) {
+                            min = (T) obj;
+                            minIndex = i;
+                        }
+                    }
+
+                    values[minIndex] = null;
+
+                    downstream.onNext(min);
+
+                    emitted++;
+                    subscribers[minIndex].request(1);
+                }
+
+                this.emitted = emitted;
+
+                // Retry if any other thread has incremented the counter.
+                if (guardCntr.decrementAndGet() == 0) {
+                    break;
+                }
+            }
+        }
+
+        /**
+         * Merge sort subscriber.
+         */
+        private static final class OrderedMergeSubscriber<T> extends 
AtomicReference<Subscription> implements Subscriber<T>, Subscription {
+            /** Parent subscription. */
+            private final OrderedMergeCompositeSubscription<T> parent;
+
+            /** Prefetch size. */
+            private final int prefetch;
+
+            /** Number of requests to buffer. */
+            private final int limit;
+
+            /** Inner data buffer. */
+            private final Queue<T> queue;
+
+            /** Count of consumed requests. */
+            private int consumed;
+
+            /** Flag indicating that the subscription has completed. */
+            private volatile boolean done;
+
+            OrderedMergeSubscriber(OrderedMergeCompositeSubscription<T> 
parent, int prefetch) {
+                this.parent = parent;
+                this.prefetch = Math.max(1, prefetch);
+                this.limit = this.prefetch - (this.prefetch >> 2);
+                this.queue = new ConcurrentLinkedQueue<>();
+            }
+
+            @Override
+            public void onSubscribe(Subscription s) {
+                if (compareAndSet(null, s)) {
+                    s.request(prefetch);
+                } else {
+                    s.cancel();
+                }
+            }
+
+            @Override
+            public void onNext(T item) {
+                queue.offer(item);
+
+                parent.drain();
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                parent.onInnerError(this, throwable);
+            }
+
+            @Override
+            public void onComplete() {
+                done = true;
+
+                parent.drain();
+            }
+
+            @Override
+            public void request(long n) {
+                int c = consumed + 1;
+
+                if (c == limit) {
+                    consumed = 0;
+                    Subscription subscription = get();
+
+                    // If the subscription has not yet been cancelled - 
request upstream.
+                    if (subscription != this) {
+                        subscription.request(c);
+                    }
+                } else {
+                    consumed = c;
+                }
+            }
+
+            @Override
+            public void cancel() {
+                Subscription subscription = getAndSet(this);
+
+                if (subscription != null && subscription != this) {
+                    subscription.cancel();
+                }
+            }
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index 7fae73256b..1a24ad7051 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -22,9 +22,13 @@ import static org.hamcrest.core.IsEqual.equalTo;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.IntStream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
@@ -48,6 +52,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.hamcrest.Matchers;
@@ -70,24 +75,17 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                 EMPTY
         );
 
-        Object[][] tableData = {
-                {1L, null, 1, "Roman"},
-                {2L, 4, 2, "Igor"},
-                {3L, 3, 1, "Taras"},
-                {4L, 2, null, "Alexey"},
-                {5L, 4, 1, "Ivan"},
-                {6L, 2, 1, "Andrey"}
-        };
+        Comparator<Object[]> comp = Comparator.comparingLong(v -> (long) 
((Object[]) v)[0]);
 
-        // TODO: sort data, once IndexScanNode will support merging.
-        Object[][] result = tableData;
+        Object[][] tableData = generateIndexData(2, Commons.IN_BUFFER_SIZE * 
2, true);
+        Object[][] expected = 
Arrays.stream(tableData).map(Object[]::clone).sorted(comp).toArray(Object[][]::new);
 
         // Validate sort order.
         validateSortedIndexScan(
                 tableData,
                 null,
                 null,
-                result
+                expected
         );
 
         // Validate bounds.
@@ -95,14 +93,14 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                 tableData,
                 new Object[]{2, 1},
                 new Object[]{3, 0},
-                result
+                expected
         );
 
         validateSortedIndexScan(
                 tableData,
                 new Object[]{2, 1},
                 new Object[]{4},
-                result
+                expected
 
         );
 
@@ -110,7 +108,7 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                 tableData,
                 new Object[]{null},
                 null,
-                result
+                expected
         );
 
         // Validate failure due to incorrect bounds.
@@ -119,10 +117,8 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                         tableData,
                         new Object[]{2, "Brutus"},
                         new Object[]{3.9, 0},
-                        // TODO: sort data, once IndexScanNode will support 
merging.
                         EMPTY
                 ), ClassCastException.class);
-
     }
 
     @Test
@@ -134,14 +130,7 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                 EMPTY
         );
 
-        Object[][] tableData = {
-                {1L, null, 1, "Roman"},
-                {2L, 4, 2, "Igor"},
-                {3L, 3, 1, "Taras"},
-                {4L, 2, null, "Alexey"},
-                {5L, 4, 2, "Ivan"},
-                {6L, 2, 1, "Andrey"}
-        };
+        Object[][] tableData = generateIndexData(2, Commons.IN_BUFFER_SIZE * 
2, false);
 
         // Validate data.
         validateHashIndexScan(
@@ -177,6 +166,40 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                 ), ClassCastException.class);
     }
 
+    private Object[][] generateIndexData(int partCnt, int partSize, boolean 
sorted) {
+        Set<Long> uniqueNumbers = new HashSet<>();
+
+        while (uniqueNumbers.size() < partCnt * partSize) {
+            uniqueNumbers.add(ThreadLocalRandom.current().nextLong());
+        }
+
+        List<Long> uniqueNumList = new ArrayList<>(uniqueNumbers);
+
+        Object[][] data = new Object[partCnt * partSize][4];
+
+        for (int p = 0; p < partCnt; p++) {
+            if (sorted) {
+                uniqueNumList.subList(p * partSize, (p + 1) * 
partSize).sort(Comparator.comparingLong(v -> v));
+            }
+
+            for (int j = 0; j < partSize; j++) {
+                int rowNum = p * partSize + j;
+
+                data[rowNum] = new Object[4];
+
+                int bound1 = ThreadLocalRandom.current().nextInt(3);
+                int bound2 = ThreadLocalRandom.current().nextInt(3);
+
+                data[rowNum][0] = uniqueNumList.get(rowNum);
+                data[rowNum][1] = bound1 == 0 ? null : bound1;
+                data[rowNum][2] = bound2 == 0 ? null : bound2;;
+                data[rowNum][3] = "row-" + rowNum;
+            }
+        }
+
+        return data;
+    }
+
     private void validateHashIndexScan(Object[][] tableData, @Nullable 
Object[] key, Object[][] expRes) {
         SchemaDescriptor schemaDescriptor = new SchemaDescriptor(
                 1,
@@ -295,6 +318,7 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                 index,
                 new TestTable(rowType),
                 new int[]{0, 2},
+                index.type() == Type.SORTED ? Comparator.comparingLong(v -> 
(long) ((Object[]) v)[0]) : null,
                 rangeIterable,
                 null,
                 null,
@@ -304,13 +328,13 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
         RootNode<Object[]> node = new RootNode<>(ectx, scanNode.rowType());
         node.register(scanNode);
 
-        ArrayList<Object[]> res = new ArrayList<>();
+        int n = 0;
 
         while (node.hasNext()) {
-            res.add(node.next());
+            assertThat(node.next(), equalTo(expectedData[n++]));
         }
 
-        assertThat(res.toArray(EMPTY), equalTo(expectedData));
+        assertThat(n, equalTo(expectedData.length));
     }
 
     private static RelDataType createRowTypeFromSchema(IgniteTypeFactory 
typeFactory, SchemaDescriptor schemaDescriptor) {
@@ -355,9 +379,25 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
     private static Publisher<BinaryTuple> dummyPublisher(BinaryTuple[] rows) {
         return s -> {
             s.onSubscribe(new Subscription() {
+                int off = 0;
+                boolean completed = false;
+
                 @Override
                 public void request(long n) {
-                    // No-op.
+                    int start = off;
+                    int end = Math.min(start + (int) n, rows.length);
+
+                    off = end;
+
+                    for (int i = start; i < end; i++) {
+                        s.onNext(rows[i]);
+                    }
+
+                    if (off >= rows.length && !completed) {
+                        completed = true;
+
+                        s.onComplete();
+                    }
                 }
 
                 @Override
@@ -365,12 +405,6 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                     // No-op.
                 }
             });
-
-            for (int i = 0; i < rows.length; ++i) {
-                s.onNext(rows[i]);
-            }
-
-            s.onComplete();
         };
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CompositeSubscriptionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CompositeSubscriptionTest.java
new file mode 100644
index 0000000000..5d910f8b73
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CompositeSubscriptionTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Composite subscription test.
+ */
+public class CompositeSubscriptionTest {
+    /** Test case probe count. */
+    private static final int PROBE_CNT = 10;
+
+    @Test
+    public void testEnoughData() throws Throwable {
+        for (int req = 1; req <= PROBE_CNT; req++) {
+            doPublishSubscribe(req, req + 1, req + 1, true, false, true);
+            doPublishSubscribe(req, req + 1, req + 1, true, false, false);
+
+            for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) {
+                doPublishSubscribe(req, req * pubCnt, pubCnt, true, false, 
true);
+                doPublishSubscribe(req, req * pubCnt, pubCnt, true, false, 
false);
+                doPublishSubscribe(req, req * pubCnt, pubCnt, false, false, 
true);
+            }
+        }
+    }
+
+    @Test
+    public void testNotEnoughData() throws Throwable {
+        for (int req = 1; req <= PROBE_CNT; req++) {
+            doPublishSubscribe(req, 0, req, true, false, true);
+            doPublishSubscribe(req, 0, req, true, false, false);
+
+            for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) {
+                int total = req * pubCnt;
+                int requested = total * 2;
+
+                doPublishSubscribe(requested, total, pubCnt, true, false, 
true);
+                doPublishSubscribe(requested, total, pubCnt, true, false, 
false);
+                doPublishSubscribe(requested, total, pubCnt, false, false, 
true);
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleRequest() throws Throwable {
+        for (int req = 1; req <= PROBE_CNT; req++) {
+            for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) {
+                int total = req * pubCnt;
+                int requested = total * 2;
+
+                doPublishSubscribe(requested, total, pubCnt, true, true, true);
+                doPublishSubscribe(requested, total, pubCnt, true, true, 
false);
+                doPublishSubscribe(requested, total, pubCnt, false, true, 
true);
+            }
+        }
+    }
+
+    @Test
+    public void testExactEnoughData() throws Throwable {
+        for (int req = 1; req <= PROBE_CNT; req++) {
+            for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) {
+                int total = req * pubCnt;
+
+                doPublishSubscribe(total, total, pubCnt, true, false, true);
+                doPublishSubscribe(total, total, pubCnt, true, false, false);
+                doPublishSubscribe(total, total, pubCnt, false, false, true);
+            }
+        }
+    }
+
+    /**
+     * Test composite publishing-subscribing.
+     *
+     * @param requested Number of requested items.
+     * @param total Total number of available items.
+     * @param pubCnt Number of publishers.
+     * @param rnd {@code True} to generate random data, otherwise sequential 
range will be generated.
+     * @param split Indicates that we will divide the requested amount into 
small sub-requests.
+     * @param sort {@code True} to test merge sort strategy, otherwise all 
requests will be sent sequentially in single threaded mode.
+     * @throws InterruptedException If failed.
+     */
+    private void doPublishSubscribe(
+            int requested,
+            int total,
+            int pubCnt,
+            boolean rnd,
+            boolean split,
+            boolean sort
+    ) throws Throwable {
+        int dataCnt = total / pubCnt;
+        Integer[][] data = new Integer[pubCnt][dataCnt];
+        int[] expData = new int[total];
+        int k = 0;
+
+        for (int i = 0; i < pubCnt; i++) {
+            for (int j = 0; j < dataCnt; j++) {
+                data[i][j] = rnd ? ThreadLocalRandom.current().nextInt(total) 
: k;
+
+                expData[k++] = data[i][j];
+            }
+
+            if (sort) {
+                Arrays.sort(data[i]);
+            }
+        }
+
+        if (sort) {
+            Arrays.sort(expData);
+        }
+
+        List<TestPublisher<Integer>> publishers = new ArrayList<>();
+
+        for (int i = 0; i < pubCnt; i++) {
+            TestPublisher<Integer> pub = new TestPublisher<>(data[i], sort);
+
+            publishers.add(pub);
+        }
+
+        AtomicReference<Subscription> subscriptionRef = new 
AtomicReference<>();
+        SubscriberListener<Integer> lsnr = new SubscriberListener<>();
+
+        CompositePublisher<Integer> compPublisher = sort
+                ? new SortingCompositePublisher<>(publishers, 
Comparator.comparingInt(v -> v), requested / pubCnt)
+                : new CompositePublisher<>(publishers);
+
+        lsnr.reset(requested);
+
+        compPublisher.subscribe(new Subscriber<>() {
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscriptionRef.set(subscription);
+            }
+
+            @Override
+            public void onNext(Integer item) {
+                lsnr.onNext(item);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                assert false;
+            }
+
+            @Override
+            public void onComplete() {
+                lsnr.onComplete();
+            }
+        });
+
+        if (!split) {
+            checkSubscriptionRequest(subscriptionRef.get(), new 
InputParameters(expData, requested), lsnr);
+        } else {
+            for (int off = 1; off < Math.min(requested, total); off += off) {
+                InputParameters params = new InputParameters(expData, off);
+
+                checkSubscriptionRequest(subscriptionRef.get(), 
params.offset(off - 1), lsnr.reset(params.requested));
+            }
+        }
+
+        // Check errors.
+        for (TestPublisher<Integer> pub : publishers) {
+            Throwable err = pub.errRef.get();
+
+            if (err != null) {
+                throw err;
+            }
+        }
+    }
+
+    private void checkSubscriptionRequest(
+            Subscription subscription,
+            InputParameters params,
+            SubscriberListener<Integer> lsnr
+    ) throws InterruptedException {
+        subscription.request(params.requested);
+
+        Assertions.assertTrue(lsnr.awaitComplete(10),
+                "Execution timeout [params=" + params + ", results=" + lsnr + 
']');
+
+        int remaining = params.total - params.offset;
+        int expReceived = Math.min(params.requested, remaining);
+        int[] expResult = Arrays.copyOfRange(params.data, params.offset, 
params.offset + expReceived);
+
+        Assertions.assertEquals(expReceived, lsnr.res.size(), "params=" + 
params);
+        Assertions.assertEquals(expReceived, lsnr.receivedCnt.get(), "params=" 
+ params);
+
+        // Ensures that onComplete has (not) been called.
+        int expCnt = params.offset + params.requested >= params.total ? 1 : 0;
+        IgniteTestUtils.waitForCondition(() -> lsnr.onCompleteCntr.get() == 
expCnt, 10_000);
+        Assertions.assertEquals(expCnt, lsnr.onCompleteCntr.get(), "params=" + 
params);
+
+        int[] resArr = new int[expReceived];
+        int k = 0;
+
+        for (Integer n : lsnr.res) {
+            resArr[k++] = n;
+        }
+
+        Assertions.assertArrayEquals(expResult, resArr, params + "\n" + 
Arrays.toString(expResult) + "\n" + Arrays.toString(resArr) + "\n");
+    }
+
+    private static class TestPublisher<T> implements Publisher<T> {
+        private final T[] data;
+        private final AtomicReference<Throwable> errRef = new 
AtomicReference<>();
+        private final boolean async;
+        private boolean publicationComplete;
+
+        TestPublisher(T[] data, boolean async) {
+            this.data = data;
+            this.async = async;
+        }
+
+        @Override
+        public void subscribe(Subscriber<? super T> subscriber) {
+            subscriber.onSubscribe(new Subscription() {
+                private int idx;
+
+                @Override
+                public void request(long requested) {
+                    Runnable dataSupplier = () -> {
+                        int max = Math.min(data.length, idx + (int) requested);
+
+                        while (idx < max) {
+                            subscriber.onNext(data[idx++]);
+                        }
+
+                        if (idx == data.length && !publicationComplete) {
+                            publicationComplete = true;
+
+                            subscriber.onComplete();
+                        }
+                    };
+
+                    if (!async) {
+                        try {
+                            dataSupplier.run();
+                        } catch (Throwable t) {
+                            handleError(t);
+                        }
+
+                        return;
+                    }
+
+                    CompletableFuture.runAsync(() -> {
+                        // Multiple requests to the same subscription must not 
be executed concurrently.
+                        synchronized (TestPublisher.this) {
+                            dataSupplier.run();
+                        }
+                    }).whenComplete((res, err) -> {
+                        if (err != null) {
+                            handleError(err);
+                        }
+                    });
+                }
+
+                @Override
+                public void cancel() {
+                    throw new UnsupportedOperationException();
+                }
+            });
+        }
+
+        @SuppressWarnings("CallToPrintStackTrace")
+        private void handleError(Throwable err) {
+            err.printStackTrace();
+
+            errRef.compareAndSet(null, err);
+        }
+    }
+
+    private static class InputParameters {
+        int[] data;
+        int offset;
+        int total;
+        int requested;
+
+        InputParameters(int[] data, int requested) {
+            this.data = data;
+            this.requested = requested;
+            this.total = data.length;
+        }
+
+        InputParameters offset(int offset) {
+            this.offset = offset;
+
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return "InputParameters{"
+                    + "requested=" + requested
+                    + ", total=" + total
+                    + ", offset=" + offset
+                    + '}';
+        }
+    }
+
+    private static class SubscriberListener<T> {
+        final AtomicInteger receivedCnt = new AtomicInteger();
+        final AtomicInteger onCompleteCntr = new AtomicInteger();
+        final Collection<T> res = new LinkedBlockingQueue<>();
+        final AtomicReference<Integer> requestedCnt = new AtomicReference<>();
+        volatile CountDownLatch waitLatch;
+
+        SubscriberListener<T> reset(int requested) {
+            receivedCnt.set(0);
+            waitLatch = new CountDownLatch(1);
+            requestedCnt.set(requested);
+            res.clear();
+
+            return this;
+        }
+
+        void onNext(T item) {
+            res.add(item);
+
+            if (receivedCnt.incrementAndGet() == requestedCnt.get()) {
+                waitLatch.countDown();
+            }
+        }
+
+        boolean awaitComplete(int timeout) throws InterruptedException {
+            return waitLatch.await(timeout, TimeUnit.SECONDS);
+        }
+
+        void onComplete() {
+            waitLatch.countDown();
+            onCompleteCntr.incrementAndGet();
+        }
+
+        @Override
+        public String toString() {
+            return "SubscriberListener{"
+                    + "receivedCnt=" + receivedCnt.get()
+                    + ", onCompleteCntr=" + onCompleteCntr.get()
+                    + '}';
+        }
+    }
+}

Reply via email to