This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch ignite-3.0.0-beta1 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push: new a7ddaf796f IGNITE-17813 Sql. Introduce sorted reducer for IndexScanNode (#1237) a7ddaf796f is described below commit a7ddaf796f8dda66b9b2425d229f12c033f81224 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Thu Nov 3 17:17:47 2022 +0400 IGNITE-17813 Sql. Introduce sorted reducer for IndexScanNode (#1237) --- .../sql/engine/exec/LogicalRelImplementor.java | 4 +- .../sql/engine/exec/rel/IndexScanNode.java | 167 +++++---- .../ignite/internal/sql/engine/util/Commons.java | 7 + .../sql/engine/util/CompositePublisher.java | 172 +++++++++ .../sql/engine/util/SortingCompositePublisher.java | 398 +++++++++++++++++++++ .../exec/rel/IndexScanNodeExecutionTest.java | 102 ++++-- .../sql/engine/util/CompositeSubscriptionTest.java | 373 +++++++++++++++++++ 7 files changed, 1118 insertions(+), 105 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java index 5d31e63046..eefa27a037 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate; import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; +import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; import org.apache.ignite.internal.sql.engine.trait.Destination; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; @@ -304,8 +305,9 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> IgniteIndex idx = tbl.getIndex(rel.indexName()); ColocationGroup group = ctx.group(rel.sourceId()); int[] parts = group.partitions(ctx.localNode().id()); + Comparator<RowT> comp = idx.type() == Type.SORTED ? ctx.expressionFactory().comparator(rel.collation()) : null; - return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, ranges, filters, prj, requiredColumns.toBitSet()); + return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, comp, ranges, filters, prj, requiredColumns.toBitSet()); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java index 90064f3c3a..00b52b7cc3 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java @@ -19,11 +19,15 @@ package org.apache.ignite.internal.sql.engine.exec.rel; import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty; +import java.util.ArrayList; import java.util.BitSet; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Function; @@ -40,6 +44,9 @@ import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; +import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.internal.sql.engine.util.CompositePublisher; +import org.apache.ignite.internal.sql.engine.util.SortingCompositePublisher; import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.Nullable; @@ -70,9 +77,11 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { /** Participating columns. */ private final @Nullable BitSet requiredColumns; - private final RangeIterable<RowT> rangeConditions; + private final @Nullable RangeIterable<RowT> rangeConditions; - private Iterator<RangeCondition<RowT>> rangeConditionIterator; + private final @Nullable Comparator<RowT> comp; + + private @Nullable Iterator<RangeCondition<RowT>> rangeConditionIterator; private int requested; @@ -82,7 +91,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { private Subscription activeSubscription; - private int curPartIdx; + private boolean rangeConditionsProcessed; /** * Constructor. @@ -91,6 +100,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { * @param rowType Output type of the current node. * @param schemaTable The table this node should scan. * @param parts Partition numbers to scan. + * @param comp Rows comparator. * @param rangeConditions Range conditions. * @param filters Optional filter to filter out rows. * @param rowTransformer Optional projection function. @@ -102,6 +112,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { IgniteIndex schemaIndex, InternalIgniteTable schemaTable, int[] parts, + @Nullable Comparator<RowT> comp, @Nullable RangeIterable<RowT> rangeConditions, @Nullable Predicate<RowT> filters, @Nullable Function<RowT, RowT> rowTransformer, @@ -116,6 +127,9 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { this.rowTransformer = rowTransformer; this.requiredColumns = requiredColumns; this.rangeConditions = rangeConditions; + this.comp = comp; + + rangeConditionIterator = rangeConditions == null ? null : rangeConditions.iterator(); factory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType); @@ -153,8 +167,11 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { protected void rewindInternal() { requested = 0; waiting = 0; - curPartIdx = 0; - rangeConditionIterator = null; + rangeConditionsProcessed = false; + + if (rangeConditions != null) { + rangeConditionIterator = rangeConditions.iterator(); + } if (activeSubscription != null) { activeSubscription.cancel(); @@ -233,84 +250,96 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { Subscription subscription = this.activeSubscription; if (subscription != null) { subscription.request(waiting); - } else if (curPartIdx < parts.length) { - if (schemaIndex.type() == Type.SORTED) { - //TODO: https://issues.apache.org/jira/browse/IGNITE-17813 - // Introduce new publisher using merge-sort algo to merge partition index publishers. - int part = curPartIdx; - - int flags = 0; - BinaryTuple lowerBound = null; - BinaryTuple upperBound = null; - - if (rangeConditions == null) { - flags = SortedIndex.INCLUDE_LEFT | SortedIndex.INCLUDE_RIGHT; - curPartIdx++; - } else { - if (rangeConditionIterator == null) { - rangeConditionIterator = rangeConditions.iterator(); - } + } else if (!rangeConditionsProcessed) { + RangeCondition<RowT> cond = null; - RangeCondition<RowT> cond = rangeConditionIterator.next(); + if (rangeConditionIterator == null || !rangeConditionIterator.hasNext()) { + rangeConditionsProcessed = true; + } else { + cond = rangeConditionIterator.next(); + + rangeConditionsProcessed = !rangeConditionIterator.hasNext(); + } - lowerBound = toBinaryTuplePrefix(cond.lower()); - upperBound = toBinaryTuplePrefix(cond.upper()); + indexPublisher(parts, cond).subscribe(new SubscriberImpl()); + } else { + waiting = NOT_WAITING; + } + } - flags |= (cond.lowerInclude()) ? SortedIndex.INCLUDE_LEFT : 0; - flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT : 0; + private Publisher<RowT> indexPublisher(int[] parts, @Nullable RangeCondition<RowT> cond) { + List<Flow.Publisher<RowT>> partPublishers = new ArrayList<>(parts.length); - if (!rangeConditionIterator.hasNext()) { // Switch to next partition and reset range index. - rangeConditionIterator = null; - curPartIdx++; - } - } + for (int p : parts) { + partPublishers.add(partitionPublisher(p, cond)); + } - ((SortedIndex) schemaIndex.index()).scan( - parts[part], - context().transaction(), - lowerBound, - upperBound, - flags, - requiredColumns - ).subscribe(new SubscriberImpl()); - } else { - assert schemaIndex.type() == Type.HASH; + return comp != null + ? new SortingCompositePublisher<>(partPublishers, comp, Commons.SORTED_IDX_PART_PREFETCH_SIZE) + : new CompositePublisher<>(partPublishers); + } - int part = curPartIdx; - BinaryTuple key = null; + private Flow.Publisher<RowT> partitionPublisher(int part, @Nullable RangeCondition<RowT> cond) { + Publisher<BinaryTuple> pub; - if (rangeConditions == null) { - curPartIdx++; - } else { - if (rangeConditionIterator == null) { - rangeConditionIterator = rangeConditions.iterator(); - } + if (schemaIndex.type() == Type.SORTED) { + int flags = 0; + BinaryTuple lower = null; + BinaryTuple upper = null; - RangeCondition<RowT> cond = rangeConditionIterator.next(); + if (cond == null) { + flags = SortedIndex.INCLUDE_LEFT | SortedIndex.INCLUDE_RIGHT; + } else { + lower = toBinaryTuplePrefix(cond.lower()); + upper = toBinaryTuplePrefix(cond.upper()); - assert cond.lower() == cond.upper(); + flags |= (cond.lowerInclude()) ? SortedIndex.INCLUDE_LEFT : 0; + flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT : 0; + } - key = toBinaryTuple(cond.lower()); + pub = ((SortedIndex) schemaIndex.index()).scan(part, context().transaction(), lower, upper, flags, requiredColumns); + } else { + assert schemaIndex.type() == Type.HASH; + BinaryTuple key = null; - if (!rangeConditionIterator.hasNext()) { // Switch to next partition and reset range index. - rangeConditionIterator = null; - curPartIdx++; - } - } + if (cond != null) { + assert cond.lower() == cond.upper(); - schemaIndex.index().scan( - parts[part], - context().transaction(), - key, - requiredColumns - ).subscribe(new SubscriberImpl()); + key = toBinaryTuple(cond.lower()); } - } else { - waiting = NOT_WAITING; + + pub = schemaIndex.index().scan(part, context().transaction(), key, requiredColumns); } + + return downstream -> { + // BinaryTuple -> RowT converter. + Subscriber<BinaryTuple> subs = new Subscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + downstream.onSubscribe(subscription); + } + + @Override + public void onNext(BinaryTuple item) { + downstream.onNext(convert(item)); + } + + @Override + public void onError(Throwable throwable) { + downstream.onError(throwable); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + }; + + pub.subscribe(subs); + }; } - private class SubscriberImpl implements Flow.Subscriber<BinaryTuple> { + private class SubscriberImpl implements Flow.Subscriber<RowT> { private int received = 0; // HB guarded here. @@ -325,9 +354,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> { /** {@inheritDoc} */ @Override - public void onNext(BinaryTuple binRow) { - RowT row = convert(binRow); - + public void onNext(RowT row) { inBuff.add(row); if (++received == inBufSize) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java index 869e046e05..b4576d2efd 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java @@ -126,6 +126,13 @@ public final class Commons { public static final int IN_BUFFER_SIZE = 512; + /** + * The number of elements to be prefetched from each partition when scanning the sorted index. + * The higher the value, the fewer calls to the upstream will be, but at the same time, the bigger + * internal buffer will be. + */ + public static final int SORTED_IDX_PART_PREFETCH_SIZE = 100; + public static final SqlParser.Config PARSER_CONFIG = SqlParser.config() .withParserFactory(IgniteSqlParserImpl.FACTORY) .withLex(Lex.ORACLE) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java new file mode 100644 index 0000000000..6fb4614cfa --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import org.jetbrains.annotations.Nullable; + +/** + * Composite publisher. + */ +public class CompositePublisher<T> implements Publisher<T> { + /** List of upstream publishers. */ + final Collection<? extends Publisher<T>> publishers; + + /** + * Constructor. + * + * @param publishers List of upstream publishers. + */ + public CompositePublisher(Collection<? extends Publisher<T>> publishers) { + this.publishers = publishers; + } + + @Override + public void subscribe(Subscriber<? super T> downstream) { + subscribe(new CompositeSubscription<>(downstream), downstream); + } + + void subscribe(CompositeSubscription<T> subscription, Subscriber<? super T> downstream) { + subscription.subscribe(publishers); + + downstream.onSubscribe(subscription); + } + + /** + * Sequential composite subscription. + * + * <p>Sequentially receives data from each registered subscription + * until the total number of requested items has been received. + */ + public static class CompositeSubscription<T> implements Subscription { + /** List of subscriptions. */ + private final List<Subscription> subscriptions = new ArrayList<>(); + + /** Downstream subscriber. */ + protected final Subscriber<? super T> downstream; + + /** Current subscription index. */ + private int subscriptionIdx = 0; + + /** Total number of remaining items. */ + private long remaining; + + /** Flag indicating that the subscription has been cancelled. */ + private boolean cancelled; + + public CompositeSubscription(Subscriber<? super T> downstream) { + this.downstream = downstream; + } + + /** + * Subscribe multiple publishers. + * + * @param sources Publishers. + */ + public void subscribe(Collection<? extends Publisher<? extends T>> sources) { + for (Publisher<? extends T> publisher : sources) { + publisher.subscribe(new PlainSubscriber()); + } + } + + /** {@inheritDoc} */ + @Override + public void request(long n) { + remaining = n; + + requestInternal(); + } + + /** {@inheritDoc} */ + @Override + public void cancel() { + cancelled = true; + + Subscription subscription = activeSubscription(); + + if (subscription != null) { + subscription.cancel(); + } + } + + /** Request data from a subscription. */ + private void requestInternal() { + Subscription subscription = activeSubscription(); + + if (subscription != null) { + subscription.request(remaining); + } + } + + private @Nullable Subscription activeSubscription() { + if (subscriptionIdx >= subscriptions.size()) { + return null; + } + + return subscriptions.get(subscriptionIdx); + } + + /** + * Plain subscriber. + */ + protected class PlainSubscriber implements Subscriber<T> { + /** {@inheritDoc} */ + @Override + public void onSubscribe(Subscription subscription) { + subscriptions.add(subscription); + } + + /** {@inheritDoc} */ + @Override + public void onNext(T item) { + --remaining; + + downstream.onNext(item); + } + + /** {@inheritDoc} */ + @Override + public void onError(Throwable throwable) { + downstream.onError(throwable); + } + + /** {@inheritDoc} */ + @Override + public void onComplete() { + if (cancelled) { + return; + } + + if (++subscriptionIdx == subscriptions.size()) { + downstream.onComplete(); + + return; + } + + if (remaining > 0) { + requestInternal(); + } + } + } + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SortingCompositePublisher.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SortingCompositePublisher.java new file mode 100644 index 0000000000..b5b4cac617 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SortingCompositePublisher.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.util; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.invoke.VarHandle; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sorting composite publisher. + * + * <p>Merges multiple concurrent ordered data streams into one. + */ +public class SortingCompositePublisher<T> extends CompositePublisher<T> { + /** Rows comparator. */ + private final Comparator<T> comp; + + /** Prefetch size. */ + private final int prefetch; + + /** + * Constructor. + * + * @param publishers List of upstream publishers. + * @param comp Rows comparator. + * @param prefetch Prefetch size. + */ + public SortingCompositePublisher(Collection<? extends Publisher<T>> publishers, Comparator<T> comp, int prefetch) { + super(publishers); + + this.comp = comp; + this.prefetch = prefetch; + } + + @Override + public void subscribe(Subscriber<? super T> downstream) { + subscribe(new OrderedMergeCompositeSubscription<>(downstream, comp, prefetch, publishers.size()), downstream); + } + + /** + * Sorting composite subscription. + * + * <p>Merges multiple concurrent ordered data streams into one. + */ + public static class OrderedMergeCompositeSubscription<T> extends CompositePublisher.CompositeSubscription<T> { + /** Marker to indicate completed subscription. */ + private static final Object DONE = new Object(); + + /** Atomic updater for {@link #error} field. */ + private static final VarHandle ERROR; + + /** Atomic updater for {@link #cancelled} field. */ + private static final VarHandle CANCELLED; + + /** Atomic updater for {@link #requested} field. */ + private static final VarHandle REQUESTED; + + /** Counter to prevent concurrent execution of a critical section. */ + private final AtomicInteger guardCntr = new AtomicInteger(); + + /** Subscribers. */ + private final OrderedMergeSubscriber<T>[] subscribers; + + /** Rows comparator. */ + private final Comparator<? super T> comp; + + /** Last received values. */ + private final Object[] values; + + /** Error. */ + @SuppressWarnings({"unused", "FieldMayBeFinal"}) + private Throwable error; + + /** Cancelled flag. */ + @SuppressWarnings({"unused", "FieldMayBeFinal"}) + private boolean cancelled; + + /** Number of requested rows. */ + @SuppressWarnings({"unused", "FieldMayBeFinal"}) + private long requested; + + /** Number of emitted rows (guarded by {@link #guardCntr}). */ + private long emitted; + + static { + Lookup lk = MethodHandles.lookup(); + + try { + ERROR = lk.findVarHandle(OrderedMergeCompositeSubscription.class, "error", Throwable.class); + CANCELLED = lk.findVarHandle(OrderedMergeCompositeSubscription.class, "cancelled", boolean.class); + REQUESTED = lk.findVarHandle(OrderedMergeCompositeSubscription.class, "requested", long.class); + } catch (NoSuchFieldException | IllegalAccessException ex) { + throw new InternalError(ex); + } + } + + /** + * Constructor. + * + * @param downstream Downstream subscriber. + * @param comp Rows comparator. + * @param prefetch Prefetch size. + * @param cnt Count of subscriptions. + */ + public OrderedMergeCompositeSubscription(Subscriber<? super T> downstream, Comparator<? super T> comp, int prefetch, int cnt) { + super(downstream); + + this.comp = comp; + this.subscribers = new OrderedMergeSubscriber[cnt]; + + for (int i = 0; i < cnt; i++) { + this.subscribers[i] = new OrderedMergeSubscriber<>(this, prefetch); + } + + this.values = new Object[cnt]; + } + + @Override + public void subscribe(Collection<? extends Publisher<? extends T>> sources) { + int i = 0; + + for (Publisher<? extends T> publisher : sources) { + publisher.subscribe(subscribers[i++]); + } + } + + @Override + public void request(long n) { + for (;;) { + long current = (long) REQUESTED.getAcquire(this); + long next = current + n; + + if (next < 0L) { + next = Long.MAX_VALUE; + } + + if (REQUESTED.compareAndSet(this, current, next)) { + break; + } + } + + drain(); + } + + @Override + public void cancel() { + if (CANCELLED.compareAndSet(this, false, true)) { + for (OrderedMergeSubscriber<T> inner : subscribers) { + inner.cancel(); + } + + if (guardCntr.getAndIncrement() == 0) { + Arrays.fill(values, null); + + for (OrderedMergeSubscriber<T> inner : subscribers) { + inner.queue.clear(); + } + } + } + } + + private void onInnerError(OrderedMergeSubscriber<T> sender, Throwable ex) { + updateError(ex); + + sender.done = true; + + drain(); + } + + private void updateError(Throwable throwable) { + for (;;) { + Throwable current = (Throwable) ERROR.getAcquire(this); + Throwable next; + + if (current == null) { + next = throwable; + } else { + next = new Throwable(); + next.addSuppressed(current); + next.addSuppressed(throwable); + } + + if (ERROR.compareAndSet(this, current, next)) { + break; + } + } + } + + private void drain() { + // Only one thread can pass below. + if (guardCntr.getAndIncrement() != 0) { + return; + } + + // Frequently accessed fields. + Subscriber<? super T> downstream = this.downstream; + OrderedMergeSubscriber<T>[] subscribers = this.subscribers; + int subsCnt = subscribers.length; + Object[] values = this.values; + long emitted = this.emitted; + + for (;;) { + long requested = (long) REQUESTED.getAcquire(this); + + for (;;) { + if ((boolean) CANCELLED.getAcquire(this)) { + Arrays.fill(values, null); + + for (OrderedMergeSubscriber<T> inner : subscribers) { + inner.queue.clear(); + } + + return; + } + + int completed = 0; + boolean waitResponse = false; + + for (int i = 0; i < subsCnt; i++) { + Object obj = values[i]; + + if (obj == DONE) { + completed++; + } else if (obj == null) { + boolean innerDone = subscribers[i].done; + + obj = subscribers[i].queue.poll(); + + if (obj != null) { + values[i] = obj; + } else if (innerDone) { + values[i] = DONE; + + completed++; + } else { + // Subscriber has not received a response yet. + waitResponse = true; + + break; + } + } + } + + if (completed == subsCnt) { + Throwable ex = (Throwable) ERROR.getAcquire(this); + + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + + return; + } + + if (waitResponse || emitted == requested) { + break; + } + + T min = null; + int minIndex = -1; + + for (int i = 0; i < values.length; i++) { + Object obj = values[i]; + + if (obj != DONE && (min == null || comp.compare(min, (T) obj) > 0)) { + min = (T) obj; + minIndex = i; + } + } + + values[minIndex] = null; + + downstream.onNext(min); + + emitted++; + subscribers[minIndex].request(1); + } + + this.emitted = emitted; + + // Retry if any other thread has incremented the counter. + if (guardCntr.decrementAndGet() == 0) { + break; + } + } + } + + /** + * Merge sort subscriber. + */ + private static final class OrderedMergeSubscriber<T> extends AtomicReference<Subscription> implements Subscriber<T>, Subscription { + /** Parent subscription. */ + private final OrderedMergeCompositeSubscription<T> parent; + + /** Prefetch size. */ + private final int prefetch; + + /** Number of requests to buffer. */ + private final int limit; + + /** Inner data buffer. */ + private final Queue<T> queue; + + /** Count of consumed requests. */ + private int consumed; + + /** Flag indicating that the subscription has completed. */ + private volatile boolean done; + + OrderedMergeSubscriber(OrderedMergeCompositeSubscription<T> parent, int prefetch) { + this.parent = parent; + this.prefetch = Math.max(1, prefetch); + this.limit = this.prefetch - (this.prefetch >> 2); + this.queue = new ConcurrentLinkedQueue<>(); + } + + @Override + public void onSubscribe(Subscription s) { + if (compareAndSet(null, s)) { + s.request(prefetch); + } else { + s.cancel(); + } + } + + @Override + public void onNext(T item) { + queue.offer(item); + + parent.drain(); + } + + @Override + public void onError(Throwable throwable) { + parent.onInnerError(this, throwable); + } + + @Override + public void onComplete() { + done = true; + + parent.drain(); + } + + @Override + public void request(long n) { + int c = consumed + 1; + + if (c == limit) { + consumed = 0; + Subscription subscription = get(); + + // If the subscription has not yet been cancelled - request upstream. + if (subscription != this) { + subscription.request(c); + } + } else { + consumed = c; + } + } + + @Override + public void cancel() { + Subscription subscription = getAndSet(this); + + if (subscription != null && subscription != this) { + subscription.cancel(); + } + } + } + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java index 7fae73256b..1a24ad7051 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java @@ -22,9 +22,13 @@ import static org.hamcrest.core.IsEqual.equalTo; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.IntStream; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory.Builder; @@ -48,6 +52,7 @@ import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; +import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.hamcrest.Matchers; @@ -70,24 +75,17 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { EMPTY ); - Object[][] tableData = { - {1L, null, 1, "Roman"}, - {2L, 4, 2, "Igor"}, - {3L, 3, 1, "Taras"}, - {4L, 2, null, "Alexey"}, - {5L, 4, 1, "Ivan"}, - {6L, 2, 1, "Andrey"} - }; + Comparator<Object[]> comp = Comparator.comparingLong(v -> (long) ((Object[]) v)[0]); - // TODO: sort data, once IndexScanNode will support merging. - Object[][] result = tableData; + Object[][] tableData = generateIndexData(2, Commons.IN_BUFFER_SIZE * 2, true); + Object[][] expected = Arrays.stream(tableData).map(Object[]::clone).sorted(comp).toArray(Object[][]::new); // Validate sort order. validateSortedIndexScan( tableData, null, null, - result + expected ); // Validate bounds. @@ -95,14 +93,14 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { tableData, new Object[]{2, 1}, new Object[]{3, 0}, - result + expected ); validateSortedIndexScan( tableData, new Object[]{2, 1}, new Object[]{4}, - result + expected ); @@ -110,7 +108,7 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { tableData, new Object[]{null}, null, - result + expected ); // Validate failure due to incorrect bounds. @@ -119,10 +117,8 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { tableData, new Object[]{2, "Brutus"}, new Object[]{3.9, 0}, - // TODO: sort data, once IndexScanNode will support merging. EMPTY ), ClassCastException.class); - } @Test @@ -134,14 +130,7 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { EMPTY ); - Object[][] tableData = { - {1L, null, 1, "Roman"}, - {2L, 4, 2, "Igor"}, - {3L, 3, 1, "Taras"}, - {4L, 2, null, "Alexey"}, - {5L, 4, 2, "Ivan"}, - {6L, 2, 1, "Andrey"} - }; + Object[][] tableData = generateIndexData(2, Commons.IN_BUFFER_SIZE * 2, false); // Validate data. validateHashIndexScan( @@ -177,6 +166,40 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { ), ClassCastException.class); } + private Object[][] generateIndexData(int partCnt, int partSize, boolean sorted) { + Set<Long> uniqueNumbers = new HashSet<>(); + + while (uniqueNumbers.size() < partCnt * partSize) { + uniqueNumbers.add(ThreadLocalRandom.current().nextLong()); + } + + List<Long> uniqueNumList = new ArrayList<>(uniqueNumbers); + + Object[][] data = new Object[partCnt * partSize][4]; + + for (int p = 0; p < partCnt; p++) { + if (sorted) { + uniqueNumList.subList(p * partSize, (p + 1) * partSize).sort(Comparator.comparingLong(v -> v)); + } + + for (int j = 0; j < partSize; j++) { + int rowNum = p * partSize + j; + + data[rowNum] = new Object[4]; + + int bound1 = ThreadLocalRandom.current().nextInt(3); + int bound2 = ThreadLocalRandom.current().nextInt(3); + + data[rowNum][0] = uniqueNumList.get(rowNum); + data[rowNum][1] = bound1 == 0 ? null : bound1; + data[rowNum][2] = bound2 == 0 ? null : bound2;; + data[rowNum][3] = "row-" + rowNum; + } + } + + return data; + } + private void validateHashIndexScan(Object[][] tableData, @Nullable Object[] key, Object[][] expRes) { SchemaDescriptor schemaDescriptor = new SchemaDescriptor( 1, @@ -295,6 +318,7 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { index, new TestTable(rowType), new int[]{0, 2}, + index.type() == Type.SORTED ? Comparator.comparingLong(v -> (long) ((Object[]) v)[0]) : null, rangeIterable, null, null, @@ -304,13 +328,13 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { RootNode<Object[]> node = new RootNode<>(ectx, scanNode.rowType()); node.register(scanNode); - ArrayList<Object[]> res = new ArrayList<>(); + int n = 0; while (node.hasNext()) { - res.add(node.next()); + assertThat(node.next(), equalTo(expectedData[n++])); } - assertThat(res.toArray(EMPTY), equalTo(expectedData)); + assertThat(n, equalTo(expectedData.length)); } private static RelDataType createRowTypeFromSchema(IgniteTypeFactory typeFactory, SchemaDescriptor schemaDescriptor) { @@ -355,9 +379,25 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { private static Publisher<BinaryTuple> dummyPublisher(BinaryTuple[] rows) { return s -> { s.onSubscribe(new Subscription() { + int off = 0; + boolean completed = false; + @Override public void request(long n) { - // No-op. + int start = off; + int end = Math.min(start + (int) n, rows.length); + + off = end; + + for (int i = start; i < end; i++) { + s.onNext(rows[i]); + } + + if (off >= rows.length && !completed) { + completed = true; + + s.onComplete(); + } } @Override @@ -365,12 +405,6 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { // No-op. } }); - - for (int i = 0; i < rows.length; ++i) { - s.onNext(rows[i]); - } - - s.onComplete(); }; } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CompositeSubscriptionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CompositeSubscriptionTest.java new file mode 100644 index 0000000000..5d910f8b73 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CompositeSubscriptionTest.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Composite subscription test. + */ +public class CompositeSubscriptionTest { + /** Test case probe count. */ + private static final int PROBE_CNT = 10; + + @Test + public void testEnoughData() throws Throwable { + for (int req = 1; req <= PROBE_CNT; req++) { + doPublishSubscribe(req, req + 1, req + 1, true, false, true); + doPublishSubscribe(req, req + 1, req + 1, true, false, false); + + for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) { + doPublishSubscribe(req, req * pubCnt, pubCnt, true, false, true); + doPublishSubscribe(req, req * pubCnt, pubCnt, true, false, false); + doPublishSubscribe(req, req * pubCnt, pubCnt, false, false, true); + } + } + } + + @Test + public void testNotEnoughData() throws Throwable { + for (int req = 1; req <= PROBE_CNT; req++) { + doPublishSubscribe(req, 0, req, true, false, true); + doPublishSubscribe(req, 0, req, true, false, false); + + for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) { + int total = req * pubCnt; + int requested = total * 2; + + doPublishSubscribe(requested, total, pubCnt, true, false, true); + doPublishSubscribe(requested, total, pubCnt, true, false, false); + doPublishSubscribe(requested, total, pubCnt, false, false, true); + } + } + } + + @Test + public void testMultipleRequest() throws Throwable { + for (int req = 1; req <= PROBE_CNT; req++) { + for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) { + int total = req * pubCnt; + int requested = total * 2; + + doPublishSubscribe(requested, total, pubCnt, true, true, true); + doPublishSubscribe(requested, total, pubCnt, true, true, false); + doPublishSubscribe(requested, total, pubCnt, false, true, true); + } + } + } + + @Test + public void testExactEnoughData() throws Throwable { + for (int req = 1; req <= PROBE_CNT; req++) { + for (int pubCnt = 1; pubCnt <= PROBE_CNT * 10; pubCnt += 10) { + int total = req * pubCnt; + + doPublishSubscribe(total, total, pubCnt, true, false, true); + doPublishSubscribe(total, total, pubCnt, true, false, false); + doPublishSubscribe(total, total, pubCnt, false, false, true); + } + } + } + + /** + * Test composite publishing-subscribing. + * + * @param requested Number of requested items. + * @param total Total number of available items. + * @param pubCnt Number of publishers. + * @param rnd {@code True} to generate random data, otherwise sequential range will be generated. + * @param split Indicates that we will divide the requested amount into small sub-requests. + * @param sort {@code True} to test merge sort strategy, otherwise all requests will be sent sequentially in single threaded mode. + * @throws InterruptedException If failed. + */ + private void doPublishSubscribe( + int requested, + int total, + int pubCnt, + boolean rnd, + boolean split, + boolean sort + ) throws Throwable { + int dataCnt = total / pubCnt; + Integer[][] data = new Integer[pubCnt][dataCnt]; + int[] expData = new int[total]; + int k = 0; + + for (int i = 0; i < pubCnt; i++) { + for (int j = 0; j < dataCnt; j++) { + data[i][j] = rnd ? ThreadLocalRandom.current().nextInt(total) : k; + + expData[k++] = data[i][j]; + } + + if (sort) { + Arrays.sort(data[i]); + } + } + + if (sort) { + Arrays.sort(expData); + } + + List<TestPublisher<Integer>> publishers = new ArrayList<>(); + + for (int i = 0; i < pubCnt; i++) { + TestPublisher<Integer> pub = new TestPublisher<>(data[i], sort); + + publishers.add(pub); + } + + AtomicReference<Subscription> subscriptionRef = new AtomicReference<>(); + SubscriberListener<Integer> lsnr = new SubscriberListener<>(); + + CompositePublisher<Integer> compPublisher = sort + ? new SortingCompositePublisher<>(publishers, Comparator.comparingInt(v -> v), requested / pubCnt) + : new CompositePublisher<>(publishers); + + lsnr.reset(requested); + + compPublisher.subscribe(new Subscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + subscriptionRef.set(subscription); + } + + @Override + public void onNext(Integer item) { + lsnr.onNext(item); + } + + @Override + public void onError(Throwable t) { + assert false; + } + + @Override + public void onComplete() { + lsnr.onComplete(); + } + }); + + if (!split) { + checkSubscriptionRequest(subscriptionRef.get(), new InputParameters(expData, requested), lsnr); + } else { + for (int off = 1; off < Math.min(requested, total); off += off) { + InputParameters params = new InputParameters(expData, off); + + checkSubscriptionRequest(subscriptionRef.get(), params.offset(off - 1), lsnr.reset(params.requested)); + } + } + + // Check errors. + for (TestPublisher<Integer> pub : publishers) { + Throwable err = pub.errRef.get(); + + if (err != null) { + throw err; + } + } + } + + private void checkSubscriptionRequest( + Subscription subscription, + InputParameters params, + SubscriberListener<Integer> lsnr + ) throws InterruptedException { + subscription.request(params.requested); + + Assertions.assertTrue(lsnr.awaitComplete(10), + "Execution timeout [params=" + params + ", results=" + lsnr + ']'); + + int remaining = params.total - params.offset; + int expReceived = Math.min(params.requested, remaining); + int[] expResult = Arrays.copyOfRange(params.data, params.offset, params.offset + expReceived); + + Assertions.assertEquals(expReceived, lsnr.res.size(), "params=" + params); + Assertions.assertEquals(expReceived, lsnr.receivedCnt.get(), "params=" + params); + + // Ensures that onComplete has (not) been called. + int expCnt = params.offset + params.requested >= params.total ? 1 : 0; + IgniteTestUtils.waitForCondition(() -> lsnr.onCompleteCntr.get() == expCnt, 10_000); + Assertions.assertEquals(expCnt, lsnr.onCompleteCntr.get(), "params=" + params); + + int[] resArr = new int[expReceived]; + int k = 0; + + for (Integer n : lsnr.res) { + resArr[k++] = n; + } + + Assertions.assertArrayEquals(expResult, resArr, params + "\n" + Arrays.toString(expResult) + "\n" + Arrays.toString(resArr) + "\n"); + } + + private static class TestPublisher<T> implements Publisher<T> { + private final T[] data; + private final AtomicReference<Throwable> errRef = new AtomicReference<>(); + private final boolean async; + private boolean publicationComplete; + + TestPublisher(T[] data, boolean async) { + this.data = data; + this.async = async; + } + + @Override + public void subscribe(Subscriber<? super T> subscriber) { + subscriber.onSubscribe(new Subscription() { + private int idx; + + @Override + public void request(long requested) { + Runnable dataSupplier = () -> { + int max = Math.min(data.length, idx + (int) requested); + + while (idx < max) { + subscriber.onNext(data[idx++]); + } + + if (idx == data.length && !publicationComplete) { + publicationComplete = true; + + subscriber.onComplete(); + } + }; + + if (!async) { + try { + dataSupplier.run(); + } catch (Throwable t) { + handleError(t); + } + + return; + } + + CompletableFuture.runAsync(() -> { + // Multiple requests to the same subscription must not be executed concurrently. + synchronized (TestPublisher.this) { + dataSupplier.run(); + } + }).whenComplete((res, err) -> { + if (err != null) { + handleError(err); + } + }); + } + + @Override + public void cancel() { + throw new UnsupportedOperationException(); + } + }); + } + + @SuppressWarnings("CallToPrintStackTrace") + private void handleError(Throwable err) { + err.printStackTrace(); + + errRef.compareAndSet(null, err); + } + } + + private static class InputParameters { + int[] data; + int offset; + int total; + int requested; + + InputParameters(int[] data, int requested) { + this.data = data; + this.requested = requested; + this.total = data.length; + } + + InputParameters offset(int offset) { + this.offset = offset; + + return this; + } + + @Override + public String toString() { + return "InputParameters{" + + "requested=" + requested + + ", total=" + total + + ", offset=" + offset + + '}'; + } + } + + private static class SubscriberListener<T> { + final AtomicInteger receivedCnt = new AtomicInteger(); + final AtomicInteger onCompleteCntr = new AtomicInteger(); + final Collection<T> res = new LinkedBlockingQueue<>(); + final AtomicReference<Integer> requestedCnt = new AtomicReference<>(); + volatile CountDownLatch waitLatch; + + SubscriberListener<T> reset(int requested) { + receivedCnt.set(0); + waitLatch = new CountDownLatch(1); + requestedCnt.set(requested); + res.clear(); + + return this; + } + + void onNext(T item) { + res.add(item); + + if (receivedCnt.incrementAndGet() == requestedCnt.get()) { + waitLatch.countDown(); + } + } + + boolean awaitComplete(int timeout) throws InterruptedException { + return waitLatch.await(timeout, TimeUnit.SECONDS); + } + + void onComplete() { + waitLatch.countDown(); + onCompleteCntr.incrementAndGet(); + } + + @Override + public String toString() { + return "SubscriberListener{" + + "receivedCnt=" + receivedCnt.get() + + ", onCompleteCntr=" + onCompleteCntr.get() + + '}'; + } + } +}