Vladsz83 commented on code in PR #12180: URL: https://github.com/apache/ignite/pull/12180#discussion_r2200232276
########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); Review Comment: With `nodeOrAnyChild(isInstanceOf(Exchange.class))` we can't be sure that there is exactly one exchange. ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/MergeJoinPlannerTest.java: ########## @@ -2794,6 +2797,26 @@ public void testInnerDeriveMixed2() throws Exception { assertNull(sortOnTopOfScan(rel, "RIGHT_T")); } + /** */ + @Test + public void testMergeJoinIsNotAppliedForNonEquiJoin() throws Exception { + IgniteSchema schema = createSchema( + createTable("EMP", 1000, IgniteDistributions.broadcast(), + "ID", INTEGER, "NAME", VARCHAR, "DEPTNO", INTEGER) + .addIndex("emp_idx", 1, 2), + createTable("DEPT", 100, IgniteDistributions.broadcast(), + "DEPTNO", INTEGER, "NAME", VARCHAR) + .addIndex("dep_idx", 1, 0) + ); + + String sql = "select d.deptno, d.name, e.id, e.name from dept d join emp e " + + "on d.deptno = e.deptno and e.name >= d.name order by e.name, d.deptno"; + + assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteSort.class) + .and(hasChildThat(isInstanceOf(IgniteNestedLoopJoin.class)))), Review Comment: `hasChildThat(isInstanceOf(IgniteMergeJoin.class)).negate()` ? ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); + } + + /** */ + @Test + public void testSplitterColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)) + ); + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // Data is fully replicated, no exchanges requred, can be executed direcly on initiatpr node. + assertPlan(sql, schema, hasFragmentsCount(1).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1, 2), + select(nodes, 2, 3), + select(nodes, 3, 0), + select(nodes, 0, 1) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); Review Comment: Suggestion: let's check the exchange (for 'Another exchange is required') and an expected table scan with (under) `IgniteTrimExchange`. ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); + } + + /** */ + @Test + public void testSplitterColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)) + ); + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // Data is fully replicated, no exchanges requred, can be executed direcly on initiatpr node. + assertPlan(sql, schema, hasFragmentsCount(1).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1, 2), + select(nodes, 2, 3), + select(nodes, 3, 0), + select(nodes, 0, 1) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1, 2, 3)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0), + select(nodes, 1), + select(nodes, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated2() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1), + select(nodes, 2), + select(nodes, 3) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterNonColocatedReplicatedReplicated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 2)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1)) + ); + + String sql = "SELECT p.id0, d.id " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.ver0 " + + "WHERE (d.projectId + 1) = ?"; + + // Originally planned without exchanges, there is no data for table 1 on initiator node, so one exchange + // is required. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterNonColocatedReplicatedReplicated2() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 2)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1)) + ); + + String sql = "SELECT p.id0, d.id " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.ver0 " + + "WHERE (d.projectId + 1) = ?"; + + // Originally planned without exchanges, data can't be joined on any single node, so two exchanges is required. + assertPlan(sql, schema, hasFragmentsCount(3).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1, 3)) + ); + + String sql = "SELECT p.id0, d.id " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.ver0 " + + "WHERE (d.projectId + 1) = ?"; + + // Originally planned without exchanges, there is no data on initiator node, but data can be joined on one + // remote node, one exchange is required. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); Review Comment: '_one exchange is required_' but `nodeOrAnyChild(isInstanceOf(Exchange.class)).negate()` ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); + } + + /** */ + @Test + public void testSplitterColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)) + ); + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // Data is fully replicated, no exchanges requred, can be executed direcly on initiatpr node. + assertPlan(sql, schema, hasFragmentsCount(1).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1, 2), + select(nodes, 2, 3), + select(nodes, 3, 0), + select(nodes, 0, 1) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1, 2, 3)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0), + select(nodes, 1), + select(nodes, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated2() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1), + select(nodes, 2), + select(nodes, 3) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterNonColocatedReplicatedReplicated1() throws Exception { Review Comment: Minor. Might be one parameterized test. Up to you. ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); + } + + /** */ + @Test + public void testSplitterColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)) + ); + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // Data is fully replicated, no exchanges requred, can be executed direcly on initiatpr node. + assertPlan(sql, schema, hasFragmentsCount(1).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1, 2), + select(nodes, 2, 3), + select(nodes, 3, 0), + select(nodes, 0, 1) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1, 2, 3)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0), + select(nodes, 1), + select(nodes, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); Review Comment: Suggestion: the same, let's check that exctly one `Exchange` exists and to which table `IgniteTrimExchange` relates. And for the second test. ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); + } + + /** */ + @Test + public void testSplitterColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)) + ); + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // Data is fully replicated, no exchanges requred, can be executed direcly on initiatpr node. + assertPlan(sql, schema, hasFragmentsCount(1).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1, 2), + select(nodes, 2, 3), + select(nodes, 3, 0), + select(nodes, 0, 1) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1, 2, 3)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0), + select(nodes, 1), + select(nodes, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated2() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1), + select(nodes, 2), + select(nodes, 3) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterNonColocatedReplicatedReplicated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 2)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1)) + ); + + String sql = "SELECT p.id0, d.id " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.ver0 " + + "WHERE (d.projectId + 1) = ?"; + + // Originally planned without exchanges, there is no data for table 1 on initiator node, so one exchange + // is required. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterNonColocatedReplicatedReplicated2() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 2)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1)) + ); + + String sql = "SELECT p.id0, d.id " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.ver0 " + + "WHERE (d.projectId + 1) = ?"; + + // Originally planned without exchanges, data can't be joined on any single node, so two exchanges is required. Review Comment: '_two exchanges is required_' but `nodeOrAnyChild(isInstanceOf(Exchange.class)).negate()` ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), Review Comment: `0, 1` and `1, 2` are duplicated. Probably something like ``` select(nodes, 0, 1), select(nodes, 1, 2), select(nodes, 2, 3), select(nodes, 1, 3), select(nodes, 0, 3) ``` is more convenient. ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); + } + + /** */ + @Test + public void testSplitterColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)) + ); + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // Data is fully replicated, no exchanges requred, can be executed direcly on initiatpr node. + assertPlan(sql, schema, hasFragmentsCount(1).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1, 2), + select(nodes, 2, 3), + select(nodes, 3, 0), + select(nodes, 0, 1) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated1() throws Exception { Review Comment: Minor. Might be one parameterized test. Up to you. Also a test with `ColocationGroup.forNodes(select(nodes, 0, 1, 2))` and 2 fragments might be added. ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanSplitterTest.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate; +import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + +/** */ +public class PlanSplitterTest extends AbstractPlannerTest { + /** */ + private IgniteSchema createSchema( + IgniteDistribution distribution1, + ColocationGroup colocationGrp1, + IgniteDistribution distribution2, + ColocationGroup colocationGrp2 + ) { + return createSchema( + createTable("DEVELOPER", distribution1, "ID", INTEGER, "NAME", VARCHAR, "PROJECTID", INTEGER) + .setColocationGroup(colocationGrp1), + createTable("PROJECT", distribution2, "ID", INTEGER, "NAME", VARCHAR, "VER", INTEGER) + .setColocationGroup(colocationGrp2) + ); + } + + /** */ + @Test + public void testSplitterColocatedPartitionedPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.affinity(0, "Developer", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0, 1), + select(nodes, 1, 2), + select(nodes, 2, 0), + select(nodes, 0, 1), + select(nodes, 1, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0"; + + // Data is partitioned and colocated, can be joined on remote nodes, one exchange is required to transfer to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)))); + } + + /** */ + @Test + public void testSplitterColocatedReplicatedReplicated() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1, 2, 3)) + ); + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // Data is fully replicated, no exchanges requred, can be executed direcly on initiatpr node. + assertPlan(sql, schema, hasFragmentsCount(1).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); + } + + /** */ + @Test + public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1, 2), + select(nodes, 2, 3), + select(nodes, 3, 0), + select(nodes, 0, 1) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.id = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 1, 2, 3)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 0), + select(nodes, 1), + select(nodes, 2) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterPartiallyColocated2() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0)), + IgniteDistributions.affinity(0, "Project", "hash"), + ColocationGroup.forAssignments(Arrays.asList( + select(nodes, 1), + select(nodes, 2), + select(nodes, 3) + )) + ); + + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) = ?"; + + // First table is replicated and planned with TrimExchange to colocate data, but set of nodes for partitioned + // table is differ, so exchange is required for colocation. Another exchange is required to send data to + // initiator node. + assertPlan(sql, schema, hasFragmentsCount(3).and(hasChildThat(isInstanceOf(IgniteTrimExchange.class)))); + } + + /** */ + @Test + public void testSplitterNonColocatedReplicatedReplicated1() throws Exception { + IgniteSchema schema = createSchema( + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 2)), + IgniteDistributions.broadcast(), + ColocationGroup.forNodes(select(nodes, 0, 1)) + ); + + String sql = "SELECT p.id0, d.id " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.ver0 " + + "WHERE (d.projectId + 1) = ?"; + + // Originally planned without exchanges, there is no data for table 1 on initiator node, so one exchange + // is required. + assertPlan(sql, schema, hasFragmentsCount(2).and(nodeOrAnyChild(isInstanceOf(Exchange.class)).negate())); Review Comment: '_one exchange is required_' but `nodeOrAnyChild(isInstanceOf(Exchange.class)).negate()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
