This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 96281cf pending
96281cf is described below
commit 96281cf31517959b9ef33afae48e63c3091f93ad
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Nov 29 17:04:41 2019 +0300
pending
---
.../query/calcite/cluster/RegistryImpl.java | 6 +-
.../NoOpFactory.java => exchange/EndMarker.java} | 18 +--
.../query/calcite/exchange/ExchangeService.java | 30 ++++
.../processors/query/calcite/exchange/Inbox.java | 23 +++
.../processors/query/calcite/exchange/Outbox.java | 166 +++++++++++++++++++++
.../NoOpFactory.java => exec/AbstractNode.java} | 29 ++--
.../NoOpFactory.java => exec/FilterNode.java} | 32 ++--
.../processors/query/calcite/exec/JoinNode.java | 101 +++++++++++++
.../processors/query/calcite/exec/Node.java | 27 ++++
.../NoOpFactory.java => exec/SingleNode.java} | 24 +--
.../{trait/NoOpFactory.java => exec/Sink.java} | 29 ++--
.../processors/query/calcite/exec/Source.java | 24 +++
.../query/calcite/metadata/FragmentInfo.java | 20 +--
.../calcite/metadata/IgniteMdFragmentInfo.java | 3 +-
.../query/calcite/metadata/NodesMapping.java | 19 ++-
.../processors/query/calcite/rel/Receiver.java | 39 ++---
.../processors/query/calcite/rel/Sender.java | 41 ++---
.../calcite/serialize/relation/ReceiverNode.java | 16 +-
.../calcite/serialize/relation/SenderNode.java | 15 +-
.../query/calcite/splitter/Fragment.java | 80 +++++-----
.../query/calcite/splitter/QueryPlan.java | 3 +-
.../Source.java} | 38 ++---
.../NoOpFactory.java => splitter/SourceImpl.java} | 24 +--
.../query/calcite/splitter/Splitter.java | 8 +-
.../NoOpFactory.java => splitter/Target.java} | 24 +--
.../SenderNode.java => splitter/TargetImpl.java} | 33 ++--
.../query/calcite/trait/AllTargetsFactory.java | 4 +-
.../query/calcite/trait/HashFunctionFactory.java | 4 +-
.../query/calcite/trait/NoOpFactory.java | 4 +-
.../query/calcite/trait/RandomTargetFactory.java | 4 +-
.../query/calcite/trait/SingleTargetFactory.java | 7 +-
.../query/calcite/CalciteQueryProcessorTest.java | 54 +------
.../query/calcite/exchange/OutboxTest.java | 133 +++++++++++++++++
.../ignite/testsuites/IgniteCalciteTestSuite.java | 5 +-
34 files changed, 754 insertions(+), 333 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 8e5773b..71f8077 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -43,6 +43,8 @@ import
org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping.DEDUPLICATED;
+
/**
*
*/
@@ -65,13 +67,13 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
}
@Override public NodesMapping local() {
- return new
NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null,
(byte) 0);
+ return new
NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null,
DEDUPLICATED);
}
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
List<ClusterNode> nodes =
ctx.discovery().discoCache(topVer).serverNodes();
- return new NodesMapping(Commons.transform(nodes, ClusterNode::id),
null, (byte) 0);
+ return new NodesMapping(Commons.transform(nodes, ClusterNode::id),
null, DEDUPLICATED);
}
@Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/EndMarker.java
similarity index 59%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/EndMarker.java
index 1988671..ecbfef2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/EndMarker.java
@@ -14,26 +14,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exchange;
import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.io.Serializable;
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public final class EndMarker implements Serializable {
+ public static final EndMarker INSTANCE = new EndMarker();
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- return null;
- }
-
- @Override public Object key() {
- return "NoOpFactory";
- }
+ private EndMarker(){}
private Object readResolve() throws ObjectStreamException {
return INSTANCE;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
new file mode 100644
index 0000000..788011a
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ *
+ */
+public interface ExchangeService {
+ void register(Outbox outbox);
+ void unregister(Outbox outbox);
+ void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int
batchId, List<?> rows);
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
new file mode 100644
index 0000000..83a8099
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+/**
+ *
+ */
+public class Inbox {
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
new file mode 100644
index 0000000..47351e2
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.SingleNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>,
Sink<T> {
+ static final int BATCH_SIZE = 200;
+ static final int PER_NODE_BATCH_COUNT = 10;
+
+ private final Map<UUID, Destination> perNode = new HashMap<>();
+
+ private final GridCacheVersion queryId;
+ private final long exchangeId;
+ private final Collection<UUID> targets;
+ private final DestinationFunction function;
+
+ private ExchangeService srvc;
+
+ protected Outbox(GridCacheVersion queryId, long exchangeId,
Collection<UUID> targets, DestinationFunction function) {
+ super(Sink.noOp());
+ this.queryId = queryId;
+ this.exchangeId = exchangeId;
+
+ this.targets = targets;
+ this.function = function;
+ }
+
+ public void acknowledge(UUID nodeId, int batchId) {
+ perNode.get(nodeId).acknowledge(batchId);
+ }
+
+ @Override public Sink<T> sink(int idx) {
+ if (idx != 0)
+ throw new IndexOutOfBoundsException();
+
+ return this;
+ }
+
+ @Override public boolean push(T row) {
+ List<UUID> nodes = function.destination(row);
+
+ if (F.isEmpty(nodes))
+ return true;
+
+ List<Destination> destinations = new ArrayList<>(nodes.size());
+
+ for (UUID node : nodes) {
+ Destination dest = perNode.computeIfAbsent(node, Destination::new);
+
+ if (!dest.ready()) {
+ dest.needSignal();
+
+ return false;
+ }
+
+ destinations.add(dest);
+ }
+
+ for (Destination dest : destinations)
+ dest.add(row);
+
+ return true;
+ }
+
+ public void init(ExchangeService srvc) {
+ this.srvc = srvc;
+
+ srvc.register(this);
+
+ signal();
+ }
+
+ @Override public void end() {
+ for (UUID node : targets)
+ perNode.computeIfAbsent(node, Destination::new).end();
+
+ srvc.unregister(this);
+ }
+
+ private final class Destination {
+ private final UUID nodeId;
+
+ private int hwm = -1;
+ private int lwm = -1;
+
+ private ArrayList<Object> curr = new ArrayList<>(BATCH_SIZE + 1); //
extra space for end marker;
+
+ private boolean needSignal;
+
+ private Destination(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public void add(T row) {
+ if (curr.size() == BATCH_SIZE) {
+ assert ready() && srvc != null;
+
+ srvc.send(queryId, exchangeId, nodeId, ++hwm, curr);
+
+ curr = new ArrayList<>(BATCH_SIZE);
+ }
+
+ curr.add(row);
+ }
+
+ public void end() {
+ curr.add(EndMarker.INSTANCE);
+
+ assert srvc != null;
+
+ srvc.send(queryId, exchangeId, nodeId, hwm, curr);
+
+ curr = null;
+ hwm = Integer.MAX_VALUE;
+ }
+
+ boolean ready() {
+ return hwm - lwm < PER_NODE_BATCH_COUNT || curr.size() <
BATCH_SIZE;
+ }
+
+ void acknowledge(int id) {
+ if (lwm < id) {
+ lwm = id;
+
+ if (needSignal) {
+ needSignal = false;
+
+ signal();
+ }
+ }
+ }
+
+ public void needSignal() {
+ needSignal = true;
+ }
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
similarity index 50%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
index 1988671..e832da4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
@@ -14,28 +14,31 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exec;
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.util.Collections;
+import java.util.List;
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public abstract class AbstractNode<T> implements Node<T>, Source {
+ protected final Sink<T> target;
+ protected List<Source> sources;
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- return null;
+ protected AbstractNode(Sink<T> target) {
+ this.target = target;
}
- @Override public Object key() {
- return "NoOpFactory";
+ @Override public void sources(List<Source> sources) {
+ this.sources = Collections.unmodifiableList(sources);
}
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
+ public void signal(int idx) {
+ sources.get(idx).signal();
+ }
+
+ @Override public void signal() {
+ sources.forEach(Source::signal);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilterNode.java
similarity index 50%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilterNode.java
index 1988671..3075ee4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilterNode.java
@@ -14,28 +14,34 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exec;
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.util.function.Predicate;
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public class FilterNode extends AbstractNode<Object[]> implements
SingleNode<Object[]>, Sink<Object[]> {
+ private final Predicate<Object[]> predicate;
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- return null;
+ public FilterNode(Sink<Object[]> target, Predicate<Object[]> predicate) {
+ super(target);
+
+ this.predicate = predicate;
+ }
+
+ @Override public Sink<Object[]> sink(int idx) {
+ if (idx != 0)
+ throw new IndexOutOfBoundsException();
+
+ return this;
}
- @Override public Object key() {
- return "NoOpFactory";
+ @Override public boolean push(Object[] row) {
+ return !predicate.test(row) || target.push(row);
}
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
+ @Override public void end() {
+ target.end();
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
new file mode 100644
index 0000000..32818e1
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.function.BiFunction;
+
+/**
+ *
+ */
+public class JoinNode extends AbstractNode {
+ private final BiFunction<Object[], Object[], Object[]> expression;
+ private final ArraySink<Object[]> left;
+ private final ArraySink<Object[]> right;
+
+ private int leftIdx;
+ private int rightIdx;
+ private boolean end;
+
+ public JoinNode(Sink<Object[]> target, BiFunction<Object[], Object[],
Object[]> expression) {
+ super(target);
+ this.expression = expression;
+
+ left = new ArraySink<>();
+ right = new ArraySink<>();
+ }
+
+ @Override public Sink<Object[]> sink(int idx) {
+ switch (idx) {
+ case 0:
+ return left;
+ case 1:
+ return right;
+ default:
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ @Override public void signal() {
+ if (end)
+ return;
+
+ if (left.end && right.end)
+ tryFlush();
+
+ assert sources != null && sources.size() == 2;
+
+ if (!left.end)
+ signal(0);
+ if (!right.end)
+ signal(1);
+ }
+
+ public void tryFlush() {
+ if (left.end && right.end) {
+ for (int i = leftIdx; i < left.size(); i++) {
+ for (int j = rightIdx; j < right.size(); j++) {
+ Object[] row = expression.apply(left.get(i), right.get(j));
+
+ if (row != null && !target.push(row)) {
+ leftIdx = i;
+ rightIdx = j;
+
+ return;
+ }
+ }
+ }
+
+ end = true;
+ target.end();
+ }
+ }
+
+ private final class ArraySink<T> extends ArrayList<T> implements Sink<T> {
+ private boolean end;
+
+ @Override public boolean push(T row) {
+ return add(row);
+ }
+
+ @Override public void end() {
+ end = true;
+
+ tryFlush();
+ }
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
new file mode 100644
index 0000000..71266ff
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.List;
+
+/**
+ *
+ */
+public interface Node<T> {
+ Sink<T> sink(int idx);
+ void sources(List<Source> sources);
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SingleNode.java
similarity index 50%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SingleNode.java
index 1988671..a0476a0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SingleNode.java
@@ -14,28 +14,20 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exec;
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.util.Collections;
+import java.util.Objects;
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
-
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- return null;
- }
-
- @Override public Object key() {
- return "NoOpFactory";
+public interface SingleNode<T> extends Node<T> {
+ default Sink<T> sink() {
+ return Objects.requireNonNull(sink(0));
}
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
+ default void source(Source source) {
+ sources(Collections.singletonList(source));
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Sink.java
similarity index 50%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Sink.java
index 1988671..842a973 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Sink.java
@@ -14,28 +14,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
-
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+package org.apache.ignite.internal.processors.query.calcite.exec;
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public interface Sink<T> {
+ Sink NO_OP = new Sink() {
+ @Override public boolean push(Object row) {
+ return true;
+ }
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- return null;
- }
+ @Override public void end() {}
+ };
- @Override public Object key() {
- return "NoOpFactory";
- }
+ boolean push(T row);
+ void end();
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
+ @SuppressWarnings("unchecked")
+ static <T> Sink<T> noOp() {
+ return NO_OP;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Source.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Source.java
new file mode 100644
index 0000000..95262c7
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Source.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+/**
+ *
+ */
+public interface Source {
+ void signal();
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index 9badd91..3ace003 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -17,25 +17,27 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
import com.google.common.collect.ImmutableList;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
/**
*
*/
public class FragmentInfo {
private final NodesMapping mapping;
- private final ImmutableList<Fragment> remoteInputs;
+ private final ImmutableList<Pair<Receiver, Source>> sources;
- public FragmentInfo(Fragment remoteInput) {
- this(ImmutableList.of(remoteInput), null);
+ public FragmentInfo(Pair<Receiver, Source> source) {
+ this(ImmutableList.of(source), null);
}
public FragmentInfo(NodesMapping mapping) {
this(null, mapping);
}
- public FragmentInfo(ImmutableList<Fragment> remoteInputs, NodesMapping
mapping) {
- this.remoteInputs = remoteInputs;
+ public FragmentInfo(ImmutableList<Pair<Receiver, Source>> sources,
NodesMapping mapping) {
+ this.sources = sources;
this.mapping = mapping;
}
@@ -43,13 +45,13 @@ public class FragmentInfo {
return mapping;
}
- public ImmutableList<Fragment> remoteInputs() {
- return remoteInputs;
+ public ImmutableList<Pair<Receiver, Source>> sources() {
+ return sources;
}
public FragmentInfo merge(FragmentInfo other) throws
LocationMappingException {
return new FragmentInfo(
- merge(remoteInputs(), other.remoteInputs()),
+ merge(sources(), other.sources()),
merge(mapping(), other.mapping()));
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 4a96cf3..9f02f0c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Pair;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
@@ -93,7 +94,7 @@ public class IgniteMdFragmentInfo implements
MetadataHandler<FragmentMetadata> {
}
public FragmentInfo getFragmentInfo(Receiver rel, RelMetadataQuery mq) {
- return new FragmentInfo(rel.sourceFragment());
+ return new FragmentInfo(Pair.of(rel, rel.source()));
}
public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery
mq) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index 16bafc0..b63e3e2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.F;
@@ -77,10 +78,16 @@ public class NodesMapping implements Serializable {
return new NodesMapping(nodes, mergeAssignments(other, nodes), flags);
}
- public NodesMapping deduplicate() throws LocationMappingException {
- if (assignments == null || !excessive())
+ public NodesMapping deduplicate() {
+ if (!excessive())
return this;
+ if (assignments == null) {
+ UUID node =
nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+
+ return new NodesMapping(Collections.singletonList(node), null,
(byte)(flags | DEDUPLICATED));
+ }
+
HashSet<UUID> nodes0 = new HashSet<>();
List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
@@ -88,10 +95,12 @@ public class NodesMapping implements Serializable {
UUID node = F.first(partNodes);
if (node == null)
- throw new LocationMappingException("Failed to map fragment to
location.");
+ assignments0.add(Collections.emptyList());
+ else {
+ assignments0.add(Collections.singletonList(node));
- assignments0.add(Collections.singletonList(node));
- nodes0.add(node);
+ nodes0.add(node);
+ }
}
return new NodesMapping(new ArrayList<>(nodes0), assignments0,
(byte)(flags | DEDUPLICATED));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index cde4290..0287236 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -20,39 +20,25 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.type.RelDataType;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
/**
*
*/
public final class Receiver extends AbstractRelNode implements IgniteRel {
- private final Fragment sourceFragment;
- private final NodesMapping sourceMapping;
+ private final Source source;
/**
* @param cluster Cluster this relational expression belongs to
* @param traits Trait set.
*/
- public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType
rowType, Fragment sourceFragment) {
+ public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType
rowType, Source source) {
super(cluster, traits);
this.rowType = rowType;
- this.sourceFragment = sourceFragment;
-
- sourceMapping = null;
- }
-
- /**
- * @param cluster Cluster this relational expression belongs to
- * @param traits Trait set.
- */
- public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType
rowType, NodesMapping sourceMapping) {
- super(cluster, traits);
- this.rowType = rowType;
- this.sourceMapping = sourceMapping;
-
- sourceFragment = null;
+ this.source = source;
}
/** {@inheritDoc} */
@@ -60,16 +46,11 @@ public final class Receiver extends AbstractRelNode
implements IgniteRel {
return implementor.implement(this);
}
- public Fragment sourceFragment() {
- return sourceFragment;
+ public DistributionTrait distribution() {
+ return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
}
- public NodesMapping sourceMapping() {
- if (sourceFragment != null)
- return sourceFragment.mapping();
-
- assert sourceMapping != null;
-
- return sourceMapping;
+ public Source source() {
+ return source;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index aa279b2..fb1f277 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -22,9 +22,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
import org.jetbrains.annotations.NotNull;
@@ -33,29 +31,22 @@ import org.jetbrains.annotations.NotNull;
*
*/
public final class Sender extends SingleRel implements IgniteRel {
- private final DistributionTrait targetDistr;
-
- private NodesMapping targetMapping;
+ private Target target;
/**
* Creates a <code>SingleRel</code>.
- * @param cluster Cluster this relational expression belongs to
+ * @param cluster Cluster this relational expression belongs to
* @param traits Trait set.
* @param input Input relational expression
- * @param targetDistr Target distribution
*/
- public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input,
@NotNull DistributionTrait targetDistr) {
+ public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
super(cluster, traits, input);
-
- this.targetDistr = targetDistr;
}
- private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- @NotNull DistributionTrait targetDistr, @NotNull NodesMapping
targetMapping) {
+ private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input,
@NotNull Target target) {
super(cluster, traits, input);
- this.targetDistr = targetDistr;
- this.targetMapping = targetMapping;
+ this.target = target;
}
/** {@inheritDoc} */
@@ -63,23 +54,15 @@ public final class Sender extends SingleRel implements
IgniteRel {
return implementor.implement(this);
}
- public void init(NodesMapping mapping) {
- targetMapping = mapping;
- }
-
- public DistributionTrait targetDistribution() {
- return targetDistr;
- }
-
- public NodesMapping targetMapping() {
- return targetMapping;
+ public void init(Target target) {
+ this.target = target;
}
- public DestinationFunction targetFunction(org.apache.calcite.plan.Context
ctx) {
- return targetDistr.destinationFunctionFactory().create(ctx,
targetMapping, targetDistr.keys());
+ public Target target() {
+ return target;
}
- public static Sender create(RelNode input, DistributionTrait targetDistr,
NodesMapping targetMapping) {
+ public static Sender create(RelNode input, Target target) {
RelOptCluster cluster = input.getCluster();
RelMetadataQuery mq = cluster.getMetadataQuery();
@@ -87,6 +70,6 @@ public final class Sender extends SingleRel implements
IgniteRel {
.replace(IgniteRel.IGNITE_CONVENTION)
.replaceIf(DistributionTraitDef.INSTANCE, () ->
IgniteMdDistribution.distribution(input, mq));
- return new Sender(cluster, traits, input, targetDistr, targetMapping);
+ return new Sender(cluster, traits, input, target);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
index 8b8a2c2..d1a00cd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
@@ -19,9 +19,10 @@ package
org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import java.util.List;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import
org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
+import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl;
/**
@@ -29,21 +30,24 @@ import
org.apache.ignite.internal.processors.query.calcite.serialize.type.DataTy
*/
public class ReceiverNode extends RelGraphNode {
private final DataType dataType;
- private final NodesMapping sourceMapping;
+ private final Source source;
- private ReceiverNode(RelTraitSet traits, DataType dataType, NodesMapping
sourceMapping) {
+ private ReceiverNode(RelTraitSet traits, DataType dataType, Source source)
{
super(traits);
this.dataType = dataType;
- this.sourceMapping = sourceMapping;
+ this.source = source;
}
public static ReceiverNode create(Receiver rel) {
- return new ReceiverNode(rel.getTraitSet(),
DataType.fromType(rel.getRowType()), rel.sourceMapping());
+ Source source = new SourceImpl(rel.source().exchangeId(),
rel.source().mapping());
+
+ return new ReceiverNode(rel.getTraitSet(),
DataType.fromType(rel.getRowType()), source);
}
@Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
return new Receiver(ctx.getCluster(),
traitSet.toTraitSet(ctx.getCluster()),
- dataType.toRelDataType(ctx.getTypeFactory()), sourceMapping);
+ dataType.toRelDataType(ctx.getTypeFactory()),
+ source);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
index 44e92bd..94671dd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
@@ -18,28 +18,25 @@ package
org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import java.util.List;
import org.apache.calcite.rel.RelNode;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
public class SenderNode extends RelGraphNode {
- private final DistributionTrait targetDistr;
- private final NodesMapping targetMapping;
+ private final Target target;
- private SenderNode(DistributionTrait targetDistr, NodesMapping
targetMapping) {
- this.targetDistr = targetDistr;
- this.targetMapping = targetMapping;
+ private SenderNode(Target target) {
+ this.target = target;
}
public static SenderNode create(Sender rel) {
- return new SenderNode(rel.targetDistribution(), rel.targetMapping());
+ return new SenderNode(rel.target());
}
@Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
- return Sender.create(F.first(children), targetDistr, targetMapping);
+ return Sender.create(F.first(children), target);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 86f58e9..d12c73a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -17,85 +17,87 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
import com.google.common.collect.ImmutableList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.calcite.plan.Context;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-public class Fragment {
+public class Fragment implements Source {
+ private static final AtomicLong ID_GEN = new AtomicLong();
+
+ private final long exchangeId = ID_GEN.getAndIncrement();
+
private final RelNode root;
private NodesMapping mapping;
- private ImmutableList<Fragment> remoteInputs;
public Fragment(RelNode root) {
this.root = root;
}
public void init(Context ctx, RelMetadataQuery mq) {
- init(null, ctx, mq);
- }
+ FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
- public RelNode root() {
- return root;
- }
+ if (info.mapping() == null)
+ mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) :
registry(ctx).local();
+ else
+ mapping = info.mapping().deduplicate();
- public NodesMapping mapping() {
- return mapping;
- }
+ ImmutableList<Pair<Receiver, Source>> sources = info.sources();
+
+ if (!F.isEmpty(sources)) {
+ for (Pair<Receiver, Source> input : sources) {
+ Receiver receiver = input.left;
+ Source source = input.right;
- public ImmutableList<Fragment> remoteInputs() {
- return remoteInputs;
+ source.init(mapping, receiver.distribution(), ctx, mq);
+ }
+ }
}
- public boolean isRemote() {
- return root instanceof Sender;
+ @Override public long exchangeId() {
+ return exchangeId;
}
- private void init(Fragment parent, Context ctx, RelMetadataQuery mq) {
- FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
+ @Override public void init(NodesMapping mapping, DistributionTrait
distribution, Context ctx, RelMetadataQuery mq) {
+ assert remote();
- remoteInputs = info.remoteInputs();
+ ((Sender) root).init(new TargetImpl(exchangeId, mapping,
distribution));
- if (info.mapping() == null)
- mapping = isRemote() ? registry(ctx).random(topologyVersion(ctx))
: registry(ctx).local();
- else {
- try {
- mapping = info.mapping().deduplicate();
- }
- catch (LocationMappingException e) {
- throw new IgniteSQLException("Failed to map fragment to
location, partition lost.", e);
- }
- }
+ init(ctx, mq);
+ }
- if (parent != null) {
- assert isRemote();
+ public RelNode root() {
+ return root;
+ }
- ((Sender) root).init(parent.mapping);
- }
+ @Override public NodesMapping mapping() {
+ return mapping;
+ }
- if (!F.isEmpty(remoteInputs)) {
- for (Fragment input : remoteInputs)
- input.init(this, ctx, mq);
- }
+ private boolean remote() {
+ return root instanceof Sender;
}
private LocationRegistry registry(Context ctx) {
- return ctx.unwrap(LocationRegistry.class);
+ return Objects.requireNonNull(ctx.unwrap(LocationRegistry.class));
}
private AffinityTopologyVersion topologyVersion(Context ctx) {
- return ctx.unwrap(AffinityTopologyVersion.class);
+ return
Objects.requireNonNull(ctx.unwrap(AffinityTopologyVersion.class));
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index ae37dfd..c03879b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -26,7 +26,6 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPl
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.util.Edge;
import org.apache.ignite.internal.util.typedef.F;
@@ -63,7 +62,7 @@ public class QueryPlan {
RelOptCluster cluster = child.getCluster();
RelTraitSet traitSet = child.getTraitSet();
- Sender sender = new Sender(cluster, traitSet, child,
traitSet.getTrait(DistributionTraitDef.INSTANCE));
+ Sender sender = new Sender(cluster, traitSet, child);
Fragment fragment = new Fragment(sender);
fragments.add(fragment);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
similarity index 53%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
index 7e69adf..773dee1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
@@ -14,32 +14,34 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
-import java.io.ObjectStreamException;
-import java.util.List;
-import java.util.UUID;
import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
/**
*
*/
-final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
+public interface Source {
+ /**
+ * @return Exchange id, has to be unique in scope of query.
+ */
+ long exchangeId();
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- List<UUID> nodes = m.nodes();
+ /**
+ * @return Source mapping.
+ */
+ NodesMapping mapping();
- return r -> nodes;
- }
-
- @Override public Object key() {
- return "AllTargetsFactory";
- }
-
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
+ /**
+ * @param mapping Target mapping.
+ * @param distribution Target distribution.
+ * @param ctx Context.
+ * @param mq Metadata query instance.
+ */
+ default void init(NodesMapping mapping, DistributionTrait distribution,
Context ctx, RelMetadataQuery mq) {
+ // No-op.
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
similarity index 56%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
index 1988671..44b271d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
@@ -14,28 +14,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
+import java.io.Serializable;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public class SourceImpl implements Source, Serializable {
+ private final long exchangeId;
+ private final NodesMapping mapping;
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- return null;
+ public SourceImpl(long exchangeId, NodesMapping mapping) {
+ this.exchangeId = exchangeId;
+ this.mapping = mapping;
}
- @Override public Object key() {
- return "NoOpFactory";
+ @Override public long exchangeId() {
+ return exchangeId;
}
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
+ @Override public NodesMapping mapping() {
+ return mapping;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index d15adc7..c15c46a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -26,7 +26,6 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
/**
@@ -46,11 +45,12 @@ public class Splitter extends IgniteRelShuttle {
}
@Override public RelNode visit(IgniteExchange rel) {
- RelOptCluster cluster = rel.getCluster();
- RelTraitSet inputTraits = rel.getInput().getTraitSet();
+ RelNode input = rel.getInput();
+ RelOptCluster cluster = input.getCluster();
+ RelTraitSet inputTraits = input.getTraitSet();
RelTraitSet outputTraits = rel.getTraitSet();
- Sender sender = new Sender(cluster, inputTraits,
visit(rel.getInput()), outputTraits.getTrait(DistributionTraitDef.INSTANCE));
+ Sender sender = new Sender(cluster, inputTraits, visit(input));
Fragment fragment = new Fragment(sender);
fragments.add(fragment);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
similarity index 55%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
index 1988671..fecea20 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
@@ -14,28 +14,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
-
- @Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- return null;
- }
-
- @Override public Object key() {
- return "NoOpFactory";
- }
-
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
- }
+public interface Target {
+ long exchangeId();
+ NodesMapping mapping();
+ DistributionTrait distribution();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
similarity index 50%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
index 44e92bd..a69a8f1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
@@ -14,32 +14,35 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
-import java.util.List;
-import org.apache.calcite.rel.RelNode;
+import java.io.Serializable;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-public class SenderNode extends RelGraphNode {
- private final DistributionTrait targetDistr;
- private final NodesMapping targetMapping;
+public class TargetImpl implements Target, Serializable {
+ private final long exchangeId;
+ private final NodesMapping mapping;
+ private final DistributionTrait distribution;
- private SenderNode(DistributionTrait targetDistr, NodesMapping
targetMapping) {
- this.targetDistr = targetDistr;
- this.targetMapping = targetMapping;
+ public TargetImpl(long exchangeId, NodesMapping mapping, DistributionTrait
distribution) {
+ this.exchangeId = exchangeId;
+ this.mapping = mapping;
+ this.distribution = distribution;
}
- public static SenderNode create(Sender rel) {
- return new SenderNode(rel.targetDistribution(), rel.targetMapping());
+ @Override public long exchangeId() {
+ return exchangeId;
}
- @Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
- return Sender.create(F.first(children), targetDistr, targetMapping);
+ @Override public NodesMapping mapping() {
+ return mapping;
+ }
+
+ @Override public DistributionTrait distribution() {
+ return distribution;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
index 7e69adf..2dcfe33 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
@@ -26,8 +26,8 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
/**
*
*/
-final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
+public final class AllTargetsFactory extends
AbstractDestinationFunctionFactory {
+ public static final DestinationFunctionFactory INSTANCE = new
AllTargetsFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
List<UUID> nodes = m.nodes();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index 379a707..3c2420b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -29,8 +29,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
*/
-final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new
HashFunctionFactory();
+public final class HashFunctionFactory extends
AbstractDestinationFunctionFactory {
+ public static final DestinationFunctionFactory INSTANCE = new
HashFunctionFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
assert m != null && !F.isEmpty(m.assignments());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
index 1988671..d8495b6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
@@ -24,8 +24,8 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
/**
*
*/
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public final class NoOpFactory extends AbstractDestinationFunctionFactory {
+ public static final DestinationFunctionFactory INSTANCE = new
NoOpFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
return null;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
index 8de55e5..78013d3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
@@ -28,8 +28,8 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
/**
*
*/
-final class RandomTargetFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new
RandomTargetFactory();
+public final class RandomTargetFactory extends
AbstractDestinationFunctionFactory {
+ public static final DestinationFunctionFactory INSTANCE = new
RandomTargetFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
List<UUID> nodes = m.nodes();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
index 1de22fd..4d631fc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.trait;
import java.io.ObjectStreamException;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
@@ -28,11 +29,11 @@ import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-final class SingleTargetFactory extends AbstractDestinationFunctionFactory {
- static final DestinationFunctionFactory INSTANCE = new
SingleTargetFactory();
+public final class SingleTargetFactory extends
AbstractDestinationFunctionFactory {
+ public static final DestinationFunctionFactory INSTANCE = new
SingleTargetFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- List<UUID> nodes = Collections.singletonList(F.first(m.nodes()));
+ List<UUID> nodes =
Collections.singletonList(Objects.requireNonNull(F.first(m.nodes())));
return r -> nodes;
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 0180357..6c04c1e 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -33,15 +34,6 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.configuration.BinaryConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
-import org.apache.ignite.internal.binary.BinaryContext;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
@@ -63,14 +55,8 @@ import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.logger.NullLogger;
-import org.apache.ignite.marshaller.MarshallerContextTestImpl;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.systemview.jmx.JmxSystemViewExporterSpi;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.BeforeClass;
@@ -1026,6 +1012,8 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
}
private static class TestRegistry implements LocationRegistry,
DistributionRegistry {
+ private AtomicLong idGen = new AtomicLong();
+
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
}
@@ -1059,40 +1047,4 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
throw new AssertionError("Unexpected cache id:" + cacheId);
}
}
-
- /**
- * @return Binary marshaller.
- */
- private BinaryMarshaller binaryMarshaller() throws IgniteCheckedException {
- IgniteConfiguration iCfg = new IgniteConfiguration();
-
- BinaryConfiguration bCfg = new BinaryConfiguration();
- iCfg.setBinaryConfiguration(bCfg);
- iCfg.setClientMode(false);
- iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
- //No-op.
- }
- });
- iCfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi());
-
- BinaryContext ctx = new
BinaryContext(BinaryCachingMetadataHandler.create(), iCfg, new NullLogger());
-
- BinaryMarshaller marsh = new BinaryMarshaller();
-
- MarshallerContextTestImpl marshCtx = new
MarshallerContextTestImpl(null, null);
-
- GridTestKernalContext kernCtx = new GridTestKernalContext(log, iCfg);
-
- kernCtx.add(new GridSystemViewManager(kernCtx));
- kernCtx.add(new GridDiscoveryManager(kernCtx));
-
- marshCtx.onMarshallerProcessorStarted(kernCtx, null);
-
- marsh.setContext(marshCtx);
-
- IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext",
ctx, iCfg);
-
- return marsh;
- }
}
\ No newline at end of file
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
new file mode 100644
index 0000000..89efa7c
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -0,0 +1,133 @@
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
+import org.apache.ignite.internal.processors.query.calcite.exec.Source;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import
org.apache.ignite.internal.processors.query.calcite.trait.SingleTargetFactory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class OutboxTest extends GridCommonAbstractTest {
+ private static UUID nodeId;
+ private static GridCacheVersion queryId;
+ private static DestinationFunction func;
+ private static Collection<UUID> targets;
+
+ private Outbox<Object[]> outbox;
+
+ @BeforeClass
+ public static void setupClass() {
+ nodeId = UUID.randomUUID();
+ queryId = new GridCacheVersion(0, 0, 0, 0);
+
+ NodesMapping mapping = new
NodesMapping(Collections.singletonList(nodeId), null,
NodesMapping.DEDUPLICATED);
+
+ targets = mapping.nodes();
+ func = SingleTargetFactory.INSTANCE.create(Contexts.empty(), mapping,
ImmutableIntList.of());
+ }
+
+
+ @Before
+ public void setUp() {
+ outbox = new Outbox<>(queryId, 0, targets, func);
+ }
+
+ @Test
+ public void testBasicOps() {
+ TestSource source = new TestSource();
+ TestExchangeService exch = new TestExchangeService();
+
+ Sink<Object[]> sink = outbox.sink();
+
+ outbox.source(source);
+ outbox.init(exch);
+
+ assertTrue(exch.registered);
+ assertTrue(source.signal);
+
+ source.signal = false;
+
+ int maxRows = Outbox.BATCH_SIZE * (Outbox.PER_NODE_BATCH_COUNT + 1);
+ int rows = 0;
+
+ while (sink.push(new Object[]{new Object()})) {
+ rows++;
+
+ assertFalse(rows > maxRows);
+ }
+
+ assertEquals(maxRows, rows);
+
+ assertFalse(exch.ids.isEmpty());
+
+ assertEquals(Outbox.PER_NODE_BATCH_COUNT, exch.ids.size());
+
+ assertFalse(sink.push(new Object[]{new Object()}));
+
+ assertFalse(source.signal);
+
+ outbox.acknowledge(nodeId, exch.ids.remove(0));
+
+ assertTrue(source.signal);
+
+ source.signal = false;
+
+ outbox.acknowledge(nodeId, exch.ids.remove(0));
+
+ assertFalse(source.signal);
+
+ assertTrue(sink.push(new Object[]{new Object()}));
+
+ sink.end();
+
+ assertTrue(exch.unregistered);
+
+ assertEquals(EndMarker.INSTANCE, F.last(exch.lastBatch));
+ }
+
+ private static class TestExchangeService implements ExchangeService {
+ private boolean registered;
+ private boolean unregistered;
+ private List<Integer> ids = new ArrayList<>();
+
+ private List<?> lastBatch;
+
+
+ @Override public void register(Outbox outbox) {
+ registered = true;
+ }
+
+ @Override public void unregister(Outbox outbox) {
+ unregistered = true;
+ }
+
+ @Override public void send(GridCacheVersion queryId, long exchangeId,
UUID nodeId, int batchId, List<?> rows) {
+ ids.add(batchId);
+
+ lastBatch = rows;
+ }
+ }
+
+ private static class TestSource implements Source {
+ boolean signal;
+
+ @Override public void signal() {
+ signal = true;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 55a5925..51a7bab 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
+import org.apache.ignite.internal.processors.query.calcite.exchange.OutboxTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -26,7 +27,7 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
- CalciteQueryProcessorTest.class
- ,
+ CalciteQueryProcessorTest.class,
+ OutboxTest.class,
})
public class IgniteCalciteTestSuite { }