This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f35f6da3198 IGNITE-24966 Sql. Improve OrderedMergePublisher 
performance (#5525)
f35f6da3198 is described below

commit f35f6da31982029af2e4f38be00e58b58572c47b
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Thu Apr 3 19:44:46 2025 +0300

    IGNITE-24966 Sql. Improve OrderedMergePublisher performance (#5525)
---
 .../util/subscription/OrderedMergePublisher.java   | 199 +++++++++++++++------
 1 file changed, 140 insertions(+), 59 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
index ebb5aee94cd..20f45c3241b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.util.subscription;
 
+import static it.unimi.dsi.fastutil.objects.ObjectArrays.swap;
+
+import it.unimi.dsi.fastutil.IndirectPriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectHeapIndirectPriorityQueue;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodHandles.Lookup;
 import java.lang.invoke.VarHandle;
@@ -86,17 +90,22 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
         final Subscriber<? super T> downstream;
 
         /** Counter to prevent concurrent execution of a critical section. */
-        private final AtomicInteger guardCntr = new AtomicInteger();
+        // Set initial value to 1 to prevent data processing until all 
subscribers is initialized.
+        private final AtomicInteger guardCntr = new AtomicInteger(1);
 
         /** Subscribers. */
         private final OrderedMergeSubscriber<T>[] subscribers;
 
-        /** Rows comparator. */
-        private final Comparator<? super T> comp;
-
         /** Last received values. */
         private final Object[] values;
 
+        /** Values view sorted in comparator order. */
+        private final IndirectPriorityQueue<Object> valuesQueue;
+
+        /** Processing state. */
+        @SuppressWarnings({"unused", "FieldMayBeFinal"})
+        private State state = State.INITIAL;
+
         /** Error. */
         @SuppressWarnings({"unused", "FieldMayBeFinal"})
         private ErrorChain errorChain;
@@ -109,6 +118,9 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
         @SuppressWarnings({"unused", "FieldMayBeFinal"})
         private long requested;
 
+        // Number of non-initialized values.
+        private int waiting;
+
         /** Number of emitted rows (guarded by {@link #guardCntr}). */
         private long emitted;
 
@@ -116,6 +128,8 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
 
         static final VarHandle CANCELLED;
 
+        static final VarHandle STATE;
+
         static final VarHandle REQUESTED;
 
         static {
@@ -125,6 +139,7 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
                 ERROR_CHAIN = lk.findVarHandle(OrderedMergeSubscription.class, 
"errorChain", ErrorChain.class);
                 CANCELLED = lk.findVarHandle(OrderedMergeSubscription.class, 
"cancelled", boolean.class);
                 REQUESTED = lk.findVarHandle(OrderedMergeSubscription.class, 
"requested", long.class);
+                STATE = lk.findVarHandle(OrderedMergeSubscription.class, 
"state", State.class);
             } catch (Throwable ex) {
                 throw new InternalError(ex);
             }
@@ -140,7 +155,6 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
          */
         OrderedMergeSubscription(Subscriber<? super T> downstream, 
Comparator<? super T> comp, int prefetch, int cnt) {
             this.downstream = downstream;
-            this.comp = comp;
             this.subscribers = new OrderedMergeSubscriber[cnt];
 
             for (int i = 0; i < cnt; i++) {
@@ -148,12 +162,16 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
             }
 
             this.values = new Object[cnt];
+            this.valuesQueue = new ObjectHeapIndirectPriorityQueue(values, 
comp);
+            this.waiting = cnt;
         }
 
         void subscribe(Publisher<? extends T>[] sources) {
             for (int i = 0; i < sources.length; i++) {
                 sources[i].subscribe(subscribers[i]);
             }
+
+            guardCntr.set(0);
         }
 
         /** {@inheritDoc} */
@@ -179,11 +197,13 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
         @Override
         public void cancel() {
             if (CANCELLED.compareAndSet(this, false, true)) {
+                STATE.setRelease(this, State.STOP);
+
                 for (OrderedMergeSubscriber<T> inner : subscribers) {
                     inner.cancel();
                 }
 
-                if (guardCntr.getAndIncrement() == 0) {
+                if (guardCntr.get() == 0) {
                     Arrays.fill(values, null);
 
                     for (OrderedMergeSubscriber<T> inner : subscribers) {
@@ -221,86 +241,124 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
             // Frequently accessed fields.
             Subscriber<? super T> downstream = this.downstream;
             OrderedMergeSubscriber<T>[] subscribers = this.subscribers;
-            int subsCnt = subscribers.length;
             Object[] values = this.values;
+
             long emitted = this.emitted;
 
+            // Retry loop.
             for (; ; ) {
-                long requested = (long) REQUESTED.getAcquire(this);
+                switch ((State) STATE.getAcquire(this)) {
+                    case INITIAL: {
+                        int waiting = this.waiting;
 
-                for (; ; ) {
-                    if ((boolean) CANCELLED.getAcquire(this)) {
-                        Arrays.fill(values, null);
+                        // Moves non-initialized sources to the beginning of 
the array for faster array scans
+                        // in the case of long initialization.
+                        for (int i = 0; i < waiting; ) {
+                            boolean innerDone = subscribers[0].done; // Read 
before polling to preserve correct program order.
+                            Object obj = subscribers[0].queue.poll();
 
-                        for (OrderedMergeSubscriber<T> inner : subscribers) {
-                            inner.queue.clear();
-                        }
+                            int done = (obj == null && innerDone) ? 1 : 0; // 
Flag has no effect if poll was successful.
+                            int initialized = obj != null || innerDone ? 1 : 0;
 
-                        return;
-                    }
+                            values[0] = done > 0 ? DONE : obj;
 
-                    int completed = 0;
-                    boolean waitResponse = false;
+                            waiting -= initialized;
 
-                    for (int i = 0; i < subsCnt; i++) {
-                        Object obj = values[i];
+                            int move = initialized * waiting; // No effect if 
value wasn't initialized.
+                            swap(values, 0, move);
+                            swap(subscribers, 0, move);
 
-                        if (obj == DONE) {
-                            completed++;
-                        } else if (obj == null) {
-                            boolean innerDone = subscribers[i].done;
+                            i = (initialized == 0) ? waiting : i; // Exit if 
any value was not found.
+                        }
 
-                            obj = subscribers[i].queue.poll();
+                        this.waiting = waiting;
 
-                            if (obj != null) {
-                                values[i] = obj;
-                            } else if (innerDone) {
-                                values[i] = DONE;
+                        if (waiting == 0) {
+                            // Got first rows from all subscribers.
+                            // Add all non-completed sources to the priority 
queue.
+                            for (int i = 0; i < values.length; i++) {
+                                if (values[i] != DONE) {
+                                    valuesQueue.enqueue(i);
+                                }
+                            }
 
-                                completed++;
-                            } else {
-                                // Subscriber has not received a response yet.
-                                waitResponse = true;
+                            // Then either start merge process or proceed with 
finishing if there is nothing to do.
+                            State state = valuesQueue.isEmpty() ? 
State.COMPLETING : State.RUNNING;
+                            STATE.compareAndSet(this, State.INITIAL, state);
 
-                                break;
-                            }
+                            continue;
                         }
+
+                        break;
                     }
+                    case RUNNING: {
+                        long requested = (long) REQUESTED.getAcquire(this);
+
+                        // Emit loop.
+                        while (!valuesQueue.isEmpty()) {
+                            int minIndex = valuesQueue.first();
+
+                            if (values[minIndex] == null) {
+                                boolean done = subscribers[minIndex].done;
+                                T val = subscribers[minIndex].queue.poll();
+
+                                if (val != null) {
+                                    values[minIndex] = val;
+                                    valuesQueue.changed(); // Force queue move 
the new value to it's place.
+                                    minIndex = valuesQueue.first();
+                                } else if (done) {
+                                    // No more values to emit for the current 
source, remove it from queue.
+                                    valuesQueue.dequeue();
+                                    continue;
+                                } else {
+                                    // Nothing to do, value wasn't received 
yet.
+                                    break;
+                                }
+                            }
 
-                    if (completed == subsCnt) {
-                        ErrorChain chain = (ErrorChain) 
ERROR_CHAIN.getAcquire(this);
+                            if (emitted == requested) {
+                                break;
+                            }
+
+                            downstream.onNext((T) values[minIndex]);
+                            emitted++;
 
-                        if (chain == null) {
-                            downstream.onComplete();
-                        } else {
-                            downstream.onError(chain.buildThrowable());
+                            values[minIndex] = null;
+                            subscribers[minIndex].request(1);
                         }
 
-                        return;
-                    }
+                        if (valuesQueue.isEmpty()) {
+                            STATE.compareAndSet(this, State.RUNNING, 
State.COMPLETING);
+
+                            continue;
+                        }
 
-                    if (waitResponse || emitted == requested) {
                         break;
                     }
+                    case COMPLETING: {
+                        STATE.set(this, State.STOP);
 
-                    T min = null;
-                    int minIndex = -1;
+                        // If subscription was not cancelled, there is no need 
to notify downstream.
+                        if (!(boolean) CANCELLED.getAcquire(this)) {
+                            assert valuesQueue.isEmpty();
 
-                    for (int i = 0; i < values.length; i++) {
-                        Object obj = values[i];
+                            finish(downstream);
+                        }
 
-                        if (obj != DONE && (min == null || comp.compare(min, 
(T) obj) > 0)) {
-                            min = (T) obj;
-                            minIndex = i;
+                        // Cleanup.
+                        Arrays.fill(values, null);
+                        for (OrderedMergeSubscriber<T> inner : subscribers) {
+                            inner.queue.clear();
                         }
+                        // No need to release guard.
+                        return;
                     }
-
-                    values[minIndex] = null;
-
-                    downstream.onNext(min);
-
-                    emitted++;
-                    subscribers[minIndex].request(1);
+                    case STOP: {
+                        // Terminal state. No need to release guard.
+                        return;
+                    }
+                    default:
+                        throw new IllegalStateException("Should never get 
here.");
                 }
 
                 this.emitted = emitted;
@@ -309,6 +367,17 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
                 if (guardCntr.decrementAndGet() == 0) {
                     break;
                 }
+                guardCntr.set(1);
+            }
+        }
+
+        private void finish(Subscriber<? super T> downstream) {
+            ErrorChain chain = (ErrorChain) ERROR_CHAIN.getAcquire(this);
+
+            if (chain == null) {
+                downstream.onComplete();
+            } else {
+                downstream.onError(chain.buildThrowable());
             }
         }
 
@@ -377,7 +446,7 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
 
             /** {@inheritDoc} */
             @Override
-            public void request(long n) {
+            public synchronized void request(long n) {
                 int c = consumed + 1;
 
                 if (c == limit) {
@@ -435,4 +504,16 @@ public class OrderedMergePublisher<T> implements 
Publisher<T> {
             return error;
         }
     }
+
+    /** Merge process states. */
+    private enum State {
+        /** Wait for a first rows received from each of the source 
subscribers. */
+        INITIAL,
+        /** Process incoming data. */
+        RUNNING,
+        /** Finish data processing and notify downstream. */
+        COMPLETING,
+        /** Terminal state. Just do nothing. */
+        STOP
+    }
 }

Reply via email to