This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 96281cf  pending
96281cf is described below

commit 96281cf31517959b9ef33afae48e63c3091f93ad
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Nov 29 17:04:41 2019 +0300

    pending
---
 .../query/calcite/cluster/RegistryImpl.java        |   6 +-
 .../NoOpFactory.java => exchange/EndMarker.java}   |  18 +--
 .../query/calcite/exchange/ExchangeService.java    |  30 ++++
 .../processors/query/calcite/exchange/Inbox.java   |  23 +++
 .../processors/query/calcite/exchange/Outbox.java  | 166 +++++++++++++++++++++
 .../NoOpFactory.java => exec/AbstractNode.java}    |  29 ++--
 .../NoOpFactory.java => exec/FilterNode.java}      |  32 ++--
 .../processors/query/calcite/exec/JoinNode.java    | 101 +++++++++++++
 .../processors/query/calcite/exec/Node.java        |  27 ++++
 .../NoOpFactory.java => exec/SingleNode.java}      |  24 +--
 .../{trait/NoOpFactory.java => exec/Sink.java}     |  29 ++--
 .../processors/query/calcite/exec/Source.java      |  24 +++
 .../query/calcite/metadata/FragmentInfo.java       |  20 +--
 .../calcite/metadata/IgniteMdFragmentInfo.java     |   3 +-
 .../query/calcite/metadata/NodesMapping.java       |  19 ++-
 .../processors/query/calcite/rel/Receiver.java     |  39 ++---
 .../processors/query/calcite/rel/Sender.java       |  41 ++---
 .../calcite/serialize/relation/ReceiverNode.java   |  16 +-
 .../calcite/serialize/relation/SenderNode.java     |  15 +-
 .../query/calcite/splitter/Fragment.java           |  80 +++++-----
 .../query/calcite/splitter/QueryPlan.java          |   3 +-
 .../Source.java}                                   |  38 ++---
 .../NoOpFactory.java => splitter/SourceImpl.java}  |  24 +--
 .../query/calcite/splitter/Splitter.java           |   8 +-
 .../NoOpFactory.java => splitter/Target.java}      |  24 +--
 .../SenderNode.java => splitter/TargetImpl.java}   |  33 ++--
 .../query/calcite/trait/AllTargetsFactory.java     |   4 +-
 .../query/calcite/trait/HashFunctionFactory.java   |   4 +-
 .../query/calcite/trait/NoOpFactory.java           |   4 +-
 .../query/calcite/trait/RandomTargetFactory.java   |   4 +-
 .../query/calcite/trait/SingleTargetFactory.java   |   7 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  54 +------
 .../query/calcite/exchange/OutboxTest.java         | 133 +++++++++++++++++
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |   5 +-
 34 files changed, 754 insertions(+), 333 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 8e5773b..71f8077 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -43,6 +43,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping.DEDUPLICATED;
+
 /**
  *
  */
@@ -65,13 +67,13 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
     }
 
     @Override public NodesMapping local() {
-        return new 
NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null, 
(byte) 0);
+        return new 
NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null, 
DEDUPLICATED);
     }
 
     @Override public NodesMapping random(AffinityTopologyVersion topVer) {
         List<ClusterNode> nodes = 
ctx.discovery().discoCache(topVer).serverNodes();
 
-        return new NodesMapping(Commons.transform(nodes, ClusterNode::id), 
null, (byte) 0);
+        return new NodesMapping(Commons.transform(nodes, ClusterNode::id), 
null, DEDUPLICATED);
     }
 
     @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/EndMarker.java
similarity index 59%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/EndMarker.java
index 1988671..ecbfef2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/EndMarker.java
@@ -14,26 +14,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exchange;
 
 import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.io.Serializable;
 
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public final class EndMarker implements Serializable {
+    public static final EndMarker INSTANCE = new EndMarker();
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        return null;
-    }
-
-    @Override public Object key() {
-        return "NoOpFactory";
-    }
+    private EndMarker(){}
 
     private Object readResolve() throws ObjectStreamException {
         return INSTANCE;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
new file mode 100644
index 0000000..788011a
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ *
+ */
+public interface ExchangeService {
+    void register(Outbox outbox);
+    void unregister(Outbox outbox);
+    void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int 
batchId, List<?> rows);
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
new file mode 100644
index 0000000..83a8099
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+/**
+ *
+ */
+public class Inbox {
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
new file mode 100644
index 0000000..47351e2
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.SingleNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, 
Sink<T> {
+    static final int BATCH_SIZE = 200;
+    static final int PER_NODE_BATCH_COUNT = 10;
+
+    private final Map<UUID, Destination> perNode = new HashMap<>();
+
+    private final GridCacheVersion queryId;
+    private final long exchangeId;
+    private final Collection<UUID> targets;
+    private final DestinationFunction function;
+
+    private ExchangeService srvc;
+
+    protected Outbox(GridCacheVersion queryId, long exchangeId, 
Collection<UUID> targets, DestinationFunction function) {
+        super(Sink.noOp());
+        this.queryId = queryId;
+        this.exchangeId = exchangeId;
+
+        this.targets = targets;
+        this.function = function;
+    }
+
+    public void acknowledge(UUID nodeId, int batchId) {
+        perNode.get(nodeId).acknowledge(batchId);
+    }
+
+    @Override public Sink<T> sink(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    @Override public boolean push(T row) {
+        List<UUID> nodes = function.destination(row);
+
+        if (F.isEmpty(nodes))
+            return true;
+
+        List<Destination> destinations = new ArrayList<>(nodes.size());
+
+        for (UUID node : nodes) {
+            Destination dest = perNode.computeIfAbsent(node, Destination::new);
+
+            if (!dest.ready()) {
+                dest.needSignal();
+
+                return false;
+            }
+
+            destinations.add(dest);
+        }
+
+        for (Destination dest : destinations)
+            dest.add(row);
+
+        return true;
+    }
+
+    public void init(ExchangeService srvc) {
+        this.srvc = srvc;
+
+        srvc.register(this);
+
+        signal();
+    }
+
+    @Override public void end() {
+        for (UUID node : targets)
+            perNode.computeIfAbsent(node, Destination::new).end();
+
+        srvc.unregister(this);
+    }
+
+    private final class Destination {
+        private final UUID nodeId;
+
+        private int hwm = -1;
+        private int lwm = -1;
+
+        private ArrayList<Object> curr = new ArrayList<>(BATCH_SIZE + 1); // 
extra space for end marker;
+
+        private boolean needSignal;
+
+        private Destination(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public void add(T row) {
+            if (curr.size() == BATCH_SIZE) {
+                assert ready() && srvc != null;
+
+                srvc.send(queryId, exchangeId, nodeId, ++hwm, curr);
+
+                curr = new ArrayList<>(BATCH_SIZE);
+            }
+
+            curr.add(row);
+        }
+
+        public void end() {
+            curr.add(EndMarker.INSTANCE);
+
+            assert srvc != null;
+
+            srvc.send(queryId, exchangeId, nodeId, hwm, curr);
+
+            curr = null;
+            hwm = Integer.MAX_VALUE;
+        }
+
+        boolean ready() {
+            return hwm - lwm < PER_NODE_BATCH_COUNT || curr.size() < 
BATCH_SIZE;
+        }
+
+        void acknowledge(int id) {
+            if (lwm < id) {
+                lwm = id;
+
+                if (needSignal) {
+                    needSignal = false;
+
+                    signal();
+                }
+            }
+        }
+
+        public void needSignal() {
+            needSignal = true;
+        }
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
similarity index 50%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
index 1988671..e832da4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
@@ -14,28 +14,31 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exec;
 
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.util.Collections;
+import java.util.List;
 
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public abstract class AbstractNode<T> implements Node<T>, Source {
+    protected final Sink<T> target;
+    protected List<Source> sources;
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        return null;
+    protected AbstractNode(Sink<T> target) {
+        this.target = target;
     }
 
-    @Override public Object key() {
-        return "NoOpFactory";
+    @Override public void sources(List<Source> sources) {
+        this.sources = Collections.unmodifiableList(sources);
     }
 
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
+    public void signal(int idx) {
+        sources.get(idx).signal();
+    }
+
+    @Override public void signal() {
+        sources.forEach(Source::signal);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilterNode.java
similarity index 50%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilterNode.java
index 1988671..3075ee4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilterNode.java
@@ -14,28 +14,34 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exec;
 
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.util.function.Predicate;
 
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public class FilterNode extends AbstractNode<Object[]> implements 
SingleNode<Object[]>, Sink<Object[]> {
+    private final Predicate<Object[]> predicate;
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        return null;
+    public FilterNode(Sink<Object[]> target, Predicate<Object[]> predicate) {
+        super(target);
+
+        this.predicate = predicate;
+    }
+
+    @Override public Sink<Object[]> sink(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
     }
 
-    @Override public Object key() {
-        return "NoOpFactory";
+    @Override public boolean push(Object[] row) {
+        return !predicate.test(row) || target.push(row);
     }
 
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
+    @Override public void end() {
+        target.end();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
new file mode 100644
index 0000000..32818e1
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.function.BiFunction;
+
+/**
+ *
+ */
+public class JoinNode extends AbstractNode {
+    private final BiFunction<Object[], Object[], Object[]> expression;
+    private final ArraySink<Object[]> left;
+    private final ArraySink<Object[]> right;
+
+    private int leftIdx;
+    private int rightIdx;
+    private boolean end;
+
+    public JoinNode(Sink<Object[]> target, BiFunction<Object[], Object[], 
Object[]> expression) {
+        super(target);
+        this.expression = expression;
+
+        left = new ArraySink<>();
+        right = new ArraySink<>();
+    }
+
+    @Override public Sink<Object[]> sink(int idx) {
+        switch (idx) {
+            case 0:
+                return left;
+            case 1:
+                return right;
+            default:
+                throw new IndexOutOfBoundsException();
+        }
+    }
+
+    @Override public void signal() {
+        if (end)
+            return;
+
+        if (left.end && right.end)
+            tryFlush();
+
+        assert sources != null && sources.size() == 2;
+
+        if (!left.end)
+            signal(0);
+        if (!right.end)
+            signal(1);
+    }
+
+    public void tryFlush() {
+        if (left.end && right.end) {
+            for (int i = leftIdx; i < left.size(); i++) {
+                for (int j = rightIdx; j < right.size(); j++) {
+                    Object[] row = expression.apply(left.get(i), right.get(j));
+
+                    if (row != null && !target.push(row)) {
+                        leftIdx = i;
+                        rightIdx = j;
+
+                        return;
+                    }
+                }
+            }
+
+            end = true;
+            target.end();
+        }
+    }
+
+    private final class ArraySink<T> extends ArrayList<T> implements Sink<T> {
+        private boolean end;
+
+        @Override public boolean push(T row) {
+            return add(row);
+        }
+
+        @Override public void end() {
+            end = true;
+
+            tryFlush();
+        }
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
new file mode 100644
index 0000000..71266ff
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.List;
+
+/**
+ *
+ */
+public interface Node<T> {
+    Sink<T> sink(int idx);
+    void sources(List<Source> sources);
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SingleNode.java
similarity index 50%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SingleNode.java
index 1988671..a0476a0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SingleNode.java
@@ -14,28 +14,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.exec;
 
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import java.util.Collections;
+import java.util.Objects;
 
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
-
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        return null;
-    }
-
-    @Override public Object key() {
-        return "NoOpFactory";
+public interface SingleNode<T> extends Node<T> {
+    default Sink<T> sink() {
+        return Objects.requireNonNull(sink(0));
     }
 
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
+    default void source(Source source) {
+        sources(Collections.singletonList(source));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Sink.java
similarity index 50%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Sink.java
index 1988671..842a973 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Sink.java
@@ -14,28 +14,25 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
-
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+package org.apache.ignite.internal.processors.query.calcite.exec;
 
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public interface Sink<T> {
+    Sink NO_OP = new Sink() {
+        @Override public boolean push(Object row) {
+            return true;
+        }
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        return null;
-    }
+        @Override public void end() {}
+    };
 
-    @Override public Object key() {
-        return "NoOpFactory";
-    }
+    boolean push(T row);
+    void end();
 
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
+    @SuppressWarnings("unchecked")
+    static <T> Sink<T> noOp() {
+        return NO_OP;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Source.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Source.java
new file mode 100644
index 0000000..95262c7
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Source.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+/**
+ *
+ */
+public interface Source {
+    void signal();
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index 9badd91..3ace003 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -17,25 +17,27 @@
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
 
 /**
  *
  */
 public class FragmentInfo {
     private final NodesMapping mapping;
-    private final ImmutableList<Fragment> remoteInputs;
+    private final ImmutableList<Pair<Receiver, Source>> sources;
 
-    public FragmentInfo(Fragment remoteInput) {
-        this(ImmutableList.of(remoteInput), null);
+    public FragmentInfo(Pair<Receiver, Source> source) {
+        this(ImmutableList.of(source), null);
     }
 
     public FragmentInfo(NodesMapping mapping) {
         this(null, mapping);
     }
 
-    public FragmentInfo(ImmutableList<Fragment> remoteInputs, NodesMapping 
mapping) {
-        this.remoteInputs = remoteInputs;
+    public FragmentInfo(ImmutableList<Pair<Receiver, Source>> sources, 
NodesMapping mapping) {
+        this.sources = sources;
         this.mapping = mapping;
     }
 
@@ -43,13 +45,13 @@ public class FragmentInfo {
         return mapping;
     }
 
-    public ImmutableList<Fragment> remoteInputs() {
-        return remoteInputs;
+    public ImmutableList<Pair<Receiver, Source>> sources() {
+        return sources;
     }
 
     public FragmentInfo merge(FragmentInfo other) throws 
LocationMappingException {
         return new FragmentInfo(
-            merge(remoteInputs(), other.remoteInputs()),
+            merge(sources(), other.sources()),
             merge(mapping(), other.mapping()));
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 4a96cf3..9f02f0c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Pair;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
@@ -93,7 +94,7 @@ public class IgniteMdFragmentInfo implements 
MetadataHandler<FragmentMetadata> {
     }
 
     public FragmentInfo getFragmentInfo(Receiver rel, RelMetadataQuery mq) {
-        return new FragmentInfo(rel.sourceFragment());
+        return new FragmentInfo(Pair.of(rel, rel.source()));
     }
 
     public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery 
mq) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index 16bafc0..b63e3e2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.F;
@@ -77,10 +78,16 @@ public class NodesMapping implements Serializable {
         return new NodesMapping(nodes, mergeAssignments(other, nodes), flags);
     }
 
-    public NodesMapping deduplicate() throws LocationMappingException {
-        if (assignments == null || !excessive())
+    public NodesMapping deduplicate() {
+        if (!excessive())
             return this;
 
+        if (assignments == null) {
+            UUID node = 
nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+
+            return new NodesMapping(Collections.singletonList(node), null, 
(byte)(flags | DEDUPLICATED));
+        }
+
         HashSet<UUID> nodes0 = new HashSet<>();
         List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
 
@@ -88,10 +95,12 @@ public class NodesMapping implements Serializable {
             UUID node = F.first(partNodes);
 
             if (node == null)
-                throw new LocationMappingException("Failed to map fragment to 
location.");
+                assignments0.add(Collections.emptyList());
+            else {
+                assignments0.add(Collections.singletonList(node));
 
-            assignments0.add(Collections.singletonList(node));
-            nodes0.add(node);
+                nodes0.add(node);
+            }
         }
 
         return new NodesMapping(new ArrayList<>(nodes0), assignments0, 
(byte)(flags | DEDUPLICATED));
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index cde4290..0287236 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -20,39 +20,25 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 /**
  *
  */
 public final class Receiver extends AbstractRelNode implements IgniteRel {
-    private final Fragment sourceFragment;
-    private final NodesMapping sourceMapping;
+    private final Source source;
 
     /**
      * @param cluster Cluster this relational expression belongs to
      * @param traits Trait set.
      */
-    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType 
rowType, Fragment sourceFragment) {
+    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType 
rowType, Source source) {
         super(cluster, traits);
         this.rowType = rowType;
-        this.sourceFragment = sourceFragment;
-
-        sourceMapping = null;
-    }
-
-    /**
-     * @param cluster Cluster this relational expression belongs to
-     * @param traits Trait set.
-     */
-    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType 
rowType, NodesMapping sourceMapping) {
-        super(cluster, traits);
-        this.rowType = rowType;
-        this.sourceMapping = sourceMapping;
-
-        sourceFragment = null;
+        this.source = source;
     }
 
     /** {@inheritDoc} */
@@ -60,16 +46,11 @@ public final class Receiver extends AbstractRelNode 
implements IgniteRel {
         return implementor.implement(this);
     }
 
-    public Fragment sourceFragment() {
-        return sourceFragment;
+    public DistributionTrait distribution() {
+        return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
-    public NodesMapping sourceMapping() {
-        if (sourceFragment != null)
-            return sourceFragment.mapping();
-
-        assert sourceMapping != null;
-
-        return sourceMapping;
+    public Source source() {
+        return source;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index aa279b2..fb1f277 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -22,9 +22,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 import org.jetbrains.annotations.NotNull;
@@ -33,29 +31,22 @@ import org.jetbrains.annotations.NotNull;
  *
  */
 public final class Sender extends SingleRel implements IgniteRel {
-    private final DistributionTrait targetDistr;
-
-    private NodesMapping targetMapping;
+    private Target target;
 
     /**
      * Creates a <code>SingleRel</code>.
-     *  @param cluster Cluster this relational expression belongs to
+     * @param cluster Cluster this relational expression belongs to
      * @param traits Trait set.
      * @param input Input relational expression
-     * @param targetDistr Target distribution
      */
-    public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input, 
@NotNull DistributionTrait targetDistr) {
+    public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
         super(cluster, traits, input);
-
-        this.targetDistr = targetDistr;
     }
 
-    private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-        @NotNull DistributionTrait targetDistr, @NotNull NodesMapping 
targetMapping) {
+    private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input, 
@NotNull Target target) {
         super(cluster, traits, input);
 
-        this.targetDistr = targetDistr;
-        this.targetMapping = targetMapping;
+        this.target = target;
     }
 
     /** {@inheritDoc} */
@@ -63,23 +54,15 @@ public final class Sender extends SingleRel implements 
IgniteRel {
         return implementor.implement(this);
     }
 
-    public void init(NodesMapping mapping) {
-        targetMapping = mapping;
-    }
-
-    public DistributionTrait targetDistribution() {
-        return targetDistr;
-    }
-
-    public NodesMapping targetMapping() {
-        return targetMapping;
+    public void init(Target target) {
+        this.target = target;
     }
 
-    public DestinationFunction targetFunction(org.apache.calcite.plan.Context 
ctx) {
-        return targetDistr.destinationFunctionFactory().create(ctx, 
targetMapping, targetDistr.keys());
+    public Target target() {
+        return target;
     }
 
-    public static Sender create(RelNode input, DistributionTrait targetDistr, 
NodesMapping targetMapping) {
+    public static Sender create(RelNode input, Target target) {
         RelOptCluster cluster = input.getCluster();
         RelMetadataQuery mq = cluster.getMetadataQuery();
 
@@ -87,6 +70,6 @@ public final class Sender extends SingleRel implements 
IgniteRel {
             .replace(IgniteRel.IGNITE_CONVENTION)
             .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.distribution(input, mq));
 
-        return new Sender(cluster, traits, input, targetDistr, targetMapping);
+        return new Sender(cluster, traits, input, target);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
index 8b8a2c2..d1a00cd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
@@ -19,9 +19,10 @@ package 
org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 import java.util.List;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
+import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl;
 
 
 /**
@@ -29,21 +30,24 @@ import 
org.apache.ignite.internal.processors.query.calcite.serialize.type.DataTy
  */
 public class ReceiverNode extends RelGraphNode {
     private final DataType dataType;
-    private final NodesMapping sourceMapping;
+    private final Source source;
 
-    private ReceiverNode(RelTraitSet traits, DataType dataType, NodesMapping 
sourceMapping) {
+    private ReceiverNode(RelTraitSet traits, DataType dataType, Source source) 
{
         super(traits);
         this.dataType = dataType;
-        this.sourceMapping = sourceMapping;
+        this.source = source;
     }
 
     public static ReceiverNode create(Receiver rel) {
-        return new ReceiverNode(rel.getTraitSet(), 
DataType.fromType(rel.getRowType()), rel.sourceMapping());
+        Source source = new SourceImpl(rel.source().exchangeId(), 
rel.source().mapping());
+
+        return new ReceiverNode(rel.getTraitSet(), 
DataType.fromType(rel.getRowType()), source);
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
         return new Receiver(ctx.getCluster(),
             traitSet.toTraitSet(ctx.getCluster()),
-            dataType.toRelDataType(ctx.getTypeFactory()), sourceMapping);
+            dataType.toRelDataType(ctx.getTypeFactory()),
+            source);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
index 44e92bd..94671dd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
@@ -18,28 +18,25 @@ package 
org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
 import java.util.List;
 import org.apache.calcite.rel.RelNode;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
 public class SenderNode extends RelGraphNode {
-    private final DistributionTrait targetDistr;
-    private final NodesMapping targetMapping;
+    private final Target target;
 
-    private SenderNode(DistributionTrait targetDistr, NodesMapping 
targetMapping) {
-        this.targetDistr = targetDistr;
-        this.targetMapping = targetMapping;
+    private SenderNode(Target target) {
+        this.target = target;
     }
 
     public static SenderNode create(Sender rel) {
-        return new SenderNode(rel.targetDistribution(), rel.targetMapping());
+        return new SenderNode(rel.target());
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
-        return Sender.create(F.first(children), targetDistr, targetMapping);
+        return Sender.create(F.first(children), target);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 86f58e9..d12c73a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -17,85 +17,87 @@
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import com.google.common.collect.ImmutableList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
-public class Fragment {
+public class Fragment implements Source {
+    private static final AtomicLong ID_GEN = new AtomicLong();
+
+    private final long exchangeId = ID_GEN.getAndIncrement();
+
     private final RelNode root;
 
     private NodesMapping mapping;
-    private ImmutableList<Fragment> remoteInputs;
 
     public Fragment(RelNode root) {
         this.root = root;
     }
 
     public void init(Context ctx, RelMetadataQuery mq) {
-        init(null, ctx, mq);
-    }
+        FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
 
-    public RelNode root() {
-        return root;
-    }
+        if (info.mapping() == null)
+            mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) : 
registry(ctx).local();
+        else
+            mapping = info.mapping().deduplicate();
 
-    public NodesMapping mapping() {
-        return mapping;
-    }
+        ImmutableList<Pair<Receiver, Source>> sources = info.sources();
+
+        if (!F.isEmpty(sources)) {
+            for (Pair<Receiver, Source> input : sources) {
+                Receiver receiver = input.left;
+                Source source = input.right;
 
-    public ImmutableList<Fragment> remoteInputs() {
-        return remoteInputs;
+                source.init(mapping, receiver.distribution(), ctx, mq);
+            }
+        }
     }
 
-    public boolean isRemote() {
-        return root instanceof Sender;
+    @Override public long exchangeId() {
+        return exchangeId;
     }
 
-    private void init(Fragment parent, Context ctx, RelMetadataQuery mq) {
-        FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
+    @Override public void init(NodesMapping mapping, DistributionTrait 
distribution, Context ctx, RelMetadataQuery mq) {
+        assert remote();
 
-        remoteInputs = info.remoteInputs();
+        ((Sender) root).init(new TargetImpl(exchangeId, mapping, 
distribution));
 
-        if (info.mapping() == null)
-            mapping = isRemote() ? registry(ctx).random(topologyVersion(ctx)) 
: registry(ctx).local();
-        else {
-            try {
-                mapping = info.mapping().deduplicate();
-            }
-            catch (LocationMappingException e) {
-                throw new IgniteSQLException("Failed to map fragment to 
location, partition lost.", e);
-            }
-        }
+        init(ctx, mq);
+    }
 
-        if (parent != null) {
-            assert isRemote();
+    public RelNode root() {
+        return root;
+    }
 
-            ((Sender) root).init(parent.mapping);
-        }
+    @Override public NodesMapping mapping() {
+        return mapping;
+    }
 
-        if (!F.isEmpty(remoteInputs)) {
-            for (Fragment input : remoteInputs)
-                input.init(this, ctx, mq);
-        }
+    private boolean remote() {
+        return root instanceof Sender;
     }
 
     private LocationRegistry registry(Context ctx) {
-        return ctx.unwrap(LocationRegistry.class);
+        return Objects.requireNonNull(ctx.unwrap(LocationRegistry.class));
     }
 
     private AffinityTopologyVersion topologyVersion(Context ctx) {
-        return ctx.unwrap(AffinityTopologyVersion.class);
+        return 
Objects.requireNonNull(ctx.unwrap(AffinityTopologyVersion.class));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index ae37dfd..c03879b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -26,7 +26,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPl
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -63,7 +62,7 @@ public class QueryPlan {
                 RelOptCluster cluster = child.getCluster();
                 RelTraitSet traitSet = child.getTraitSet();
 
-                Sender sender = new Sender(cluster, traitSet, child, 
traitSet.getTrait(DistributionTraitDef.INSTANCE));
+                Sender sender = new Sender(cluster, traitSet, child);
                 Fragment fragment = new Fragment(sender);
                 fragments.add(fragment);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
similarity index 53%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
index 7e69adf..773dee1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
@@ -14,32 +14,34 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import java.io.ObjectStreamException;
-import java.util.List;
-import java.util.UUID;
 import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 
 /**
  *
  */
-final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
+public interface Source {
+    /**
+     * @return Exchange id, has to be unique in scope of query.
+     */
+    long exchangeId();
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        List<UUID> nodes = m.nodes();
+    /**
+     * @return Source mapping.
+     */
+    NodesMapping mapping();
 
-        return r -> nodes;
-    }
-
-    @Override public Object key() {
-        return "AllTargetsFactory";
-    }
-
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
+    /**
+     * @param mapping Target mapping.
+     * @param distribution Target distribution.
+     * @param ctx Context.
+     * @param mq Metadata query instance.
+     */
+    default void init(NodesMapping mapping, DistributionTrait distribution, 
Context ctx, RelMetadataQuery mq) {
+        // No-op.
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
similarity index 56%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
index 1988671..44b271d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
@@ -14,28 +14,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
+import java.io.Serializable;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public class SourceImpl implements Source, Serializable {
+    private final long exchangeId;
+    private final NodesMapping mapping;
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        return null;
+    public SourceImpl(long exchangeId, NodesMapping mapping) {
+        this.exchangeId = exchangeId;
+        this.mapping = mapping;
     }
 
-    @Override public Object key() {
-        return "NoOpFactory";
+    @Override public long exchangeId() {
+        return exchangeId;
     }
 
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
+    @Override public NodesMapping mapping() {
+        return mapping;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index d15adc7..c15c46a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -26,7 +26,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
 
 /**
@@ -46,11 +45,12 @@ public class Splitter extends IgniteRelShuttle {
     }
 
     @Override public RelNode visit(IgniteExchange rel) {
-        RelOptCluster cluster = rel.getCluster();
-        RelTraitSet inputTraits = rel.getInput().getTraitSet();
+        RelNode input = rel.getInput();
+        RelOptCluster cluster = input.getCluster();
+        RelTraitSet inputTraits = input.getTraitSet();
         RelTraitSet outputTraits = rel.getTraitSet();
 
-        Sender sender = new Sender(cluster, inputTraits, 
visit(rel.getInput()), outputTraits.getTrait(DistributionTraitDef.INSTANCE));
+        Sender sender = new Sender(cluster, inputTraits, visit(input));
         Fragment fragment = new Fragment(sender);
         fragments.add(fragment);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
similarity index 55%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
index 1988671..fecea20 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
@@ -14,28 +14,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
-
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        return null;
-    }
-
-    @Override public Object key() {
-        return "NoOpFactory";
-    }
-
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
-    }
+public interface Target {
+    long exchangeId();
+    NodesMapping mapping();
+    DistributionTrait distribution();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
similarity index 50%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
index 44e92bd..a69a8f1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
@@ -14,32 +14,35 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import java.util.List;
-import org.apache.calcite.rel.RelNode;
+import java.io.Serializable;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
-public class SenderNode extends RelGraphNode {
-    private final DistributionTrait targetDistr;
-    private final NodesMapping targetMapping;
+public class TargetImpl implements Target, Serializable {
+    private final long exchangeId;
+    private final NodesMapping mapping;
+    private final DistributionTrait distribution;
 
-    private SenderNode(DistributionTrait targetDistr, NodesMapping 
targetMapping) {
-        this.targetDistr = targetDistr;
-        this.targetMapping = targetMapping;
+    public TargetImpl(long exchangeId, NodesMapping mapping, DistributionTrait 
distribution) {
+        this.exchangeId = exchangeId;
+        this.mapping = mapping;
+        this.distribution = distribution;
     }
 
-    public static SenderNode create(Sender rel) {
-        return new SenderNode(rel.targetDistribution(), rel.targetMapping());
+    @Override public long exchangeId() {
+        return exchangeId;
     }
 
-    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
-        return Sender.create(F.first(children), targetDistr, targetMapping);
+    @Override public NodesMapping mapping() {
+        return mapping;
+    }
+
+    @Override public DistributionTrait distribution() {
+        return distribution;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
index 7e69adf..2dcfe33 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
@@ -26,8 +26,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 /**
  *
  */
-final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
+public final class AllTargetsFactory extends 
AbstractDestinationFunctionFactory {
+    public static final DestinationFunctionFactory INSTANCE = new 
AllTargetsFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
         List<UUID> nodes = m.nodes();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index 379a707..3c2420b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -29,8 +29,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  *
  */
-final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new 
HashFunctionFactory();
+public final class HashFunctionFactory extends 
AbstractDestinationFunctionFactory {
+    public static final DestinationFunctionFactory INSTANCE = new 
HashFunctionFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
         assert m != null && !F.isEmpty(m.assignments());
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
index 1988671..d8495b6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
@@ -24,8 +24,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 /**
  *
  */
-final class NoOpFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
+public final class NoOpFactory extends AbstractDestinationFunctionFactory {
+    public static final DestinationFunctionFactory INSTANCE = new 
NoOpFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
         return null;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
index 8de55e5..78013d3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
@@ -28,8 +28,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 /**
  *
  */
-final class RandomTargetFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new 
RandomTargetFactory();
+public final class RandomTargetFactory extends 
AbstractDestinationFunctionFactory {
+    public static final DestinationFunctionFactory INSTANCE = new 
RandomTargetFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
         List<UUID> nodes = m.nodes();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
index 1de22fd..4d631fc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.trait;
 import java.io.ObjectStreamException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
@@ -28,11 +29,11 @@ import org.apache.ignite.internal.util.typedef.F;
 /**
  *
  */
-final class SingleTargetFactory extends AbstractDestinationFunctionFactory {
-    static final DestinationFunctionFactory INSTANCE = new 
SingleTargetFactory();
+public final class SingleTargetFactory extends 
AbstractDestinationFunctionFactory {
+    public static final DestinationFunctionFactory INSTANCE = new 
SingleTargetFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        List<UUID> nodes = Collections.singletonList(F.first(m.nodes()));
+        List<UUID> nodes = 
Collections.singletonList(Objects.requireNonNull(F.first(m.nodes())));
 
         return r -> nodes;
     }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 0180357..6c04c1e 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -33,15 +34,6 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.configuration.BinaryConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
-import org.apache.ignite.internal.binary.BinaryContext;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
@@ -63,14 +55,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.logger.NullLogger;
-import org.apache.ignite.marshaller.MarshallerContextTestImpl;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.systemview.jmx.JmxSystemViewExporterSpi;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.BeforeClass;
@@ -1026,6 +1012,8 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     private static class TestRegistry implements LocationRegistry, 
DistributionRegistry {
+        private AtomicLong idGen = new AtomicLong();
+
         @Override public NodesMapping random(AffinityTopologyVersion topVer) {
             return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
         }
@@ -1059,40 +1047,4 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             throw new AssertionError("Unexpected cache id:" + cacheId);
         }
     }
-
-    /**
-     * @return Binary marshaller.
-     */
-    private BinaryMarshaller binaryMarshaller() throws IgniteCheckedException {
-        IgniteConfiguration iCfg = new IgniteConfiguration();
-
-        BinaryConfiguration bCfg = new BinaryConfiguration();
-        iCfg.setBinaryConfiguration(bCfg);
-        iCfg.setClientMode(false);
-        iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
-            @Override public void sendCustomEvent(DiscoverySpiCustomMessage 
msg) throws IgniteException {
-                //No-op.
-            }
-        });
-        iCfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi());
-
-        BinaryContext ctx = new 
BinaryContext(BinaryCachingMetadataHandler.create(), iCfg, new NullLogger());
-
-        BinaryMarshaller marsh = new BinaryMarshaller();
-
-        MarshallerContextTestImpl marshCtx = new 
MarshallerContextTestImpl(null, null);
-
-        GridTestKernalContext kernCtx = new GridTestKernalContext(log, iCfg);
-
-        kernCtx.add(new GridSystemViewManager(kernCtx));
-        kernCtx.add(new GridDiscoveryManager(kernCtx));
-
-        marshCtx.onMarshallerProcessorStarted(kernCtx, null);
-
-        marsh.setContext(marshCtx);
-
-        IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", 
ctx, iCfg);
-
-        return marsh;
-    }
 }
\ No newline at end of file
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
new file mode 100644
index 0000000..89efa7c
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -0,0 +1,133 @@
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
+import org.apache.ignite.internal.processors.query.calcite.exec.Source;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.SingleTargetFactory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class OutboxTest extends GridCommonAbstractTest {
+    private static UUID nodeId;
+    private static GridCacheVersion queryId;
+    private static DestinationFunction func;
+    private static Collection<UUID> targets;
+
+    private Outbox<Object[]> outbox;
+
+    @BeforeClass
+    public static void setupClass() {
+        nodeId = UUID.randomUUID();
+        queryId = new GridCacheVersion(0, 0, 0, 0);
+
+        NodesMapping mapping = new 
NodesMapping(Collections.singletonList(nodeId), null, 
NodesMapping.DEDUPLICATED);
+
+        targets = mapping.nodes();
+        func = SingleTargetFactory.INSTANCE.create(Contexts.empty(), mapping, 
ImmutableIntList.of());
+    }
+
+
+    @Before
+    public void setUp() {
+        outbox = new Outbox<>(queryId, 0, targets, func);
+    }
+
+    @Test
+    public void testBasicOps() {
+        TestSource source = new TestSource();
+        TestExchangeService exch = new TestExchangeService();
+
+        Sink<Object[]> sink = outbox.sink();
+
+        outbox.source(source);
+        outbox.init(exch);
+
+        assertTrue(exch.registered);
+        assertTrue(source.signal);
+
+        source.signal = false;
+
+        int maxRows = Outbox.BATCH_SIZE * (Outbox.PER_NODE_BATCH_COUNT + 1);
+        int rows = 0;
+
+        while (sink.push(new Object[]{new Object()})) {
+            rows++;
+
+            assertFalse(rows > maxRows);
+        }
+
+        assertEquals(maxRows, rows);
+
+        assertFalse(exch.ids.isEmpty());
+
+        assertEquals(Outbox.PER_NODE_BATCH_COUNT, exch.ids.size());
+
+        assertFalse(sink.push(new Object[]{new Object()}));
+
+        assertFalse(source.signal);
+
+        outbox.acknowledge(nodeId, exch.ids.remove(0));
+
+        assertTrue(source.signal);
+
+        source.signal = false;
+
+        outbox.acknowledge(nodeId, exch.ids.remove(0));
+
+        assertFalse(source.signal);
+
+        assertTrue(sink.push(new Object[]{new Object()}));
+
+        sink.end();
+
+        assertTrue(exch.unregistered);
+
+        assertEquals(EndMarker.INSTANCE, F.last(exch.lastBatch));
+    }
+
+    private static class TestExchangeService implements ExchangeService {
+        private boolean registered;
+        private boolean unregistered;
+        private List<Integer> ids = new ArrayList<>();
+
+        private List<?> lastBatch;
+
+
+        @Override public void register(Outbox outbox) {
+            registered = true;
+        }
+
+        @Override public void unregister(Outbox outbox) {
+            unregistered = true;
+        }
+
+        @Override public void send(GridCacheVersion queryId, long exchangeId, 
UUID nodeId, int batchId, List<?> rows) {
+            ids.add(batchId);
+
+            lastBatch = rows;
+        }
+    }
+
+    private static class TestSource implements Source {
+        boolean signal;
+
+        @Override public void signal() {
+            signal = true;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 55a5925..51a7bab 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
+import org.apache.ignite.internal.processors.query.calcite.exchange.OutboxTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -26,7 +27,7 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    CalciteQueryProcessorTest.class
-    ,
+    CalciteQueryProcessorTest.class,
+    OutboxTest.class,
 })
 public class IgniteCalciteTestSuite { }

Reply via email to