This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8dd63f817e2 IGNITE-12692 SQL Calcite: Distributed table modify - Fixes
#12593.
8dd63f817e2 is described below
commit 8dd63f817e2076273176836b029037c3d0d60a3c
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Dec 30 17:24:03 2025 +0300
IGNITE-12692 SQL Calcite: Distributed table modify - Fixes #12593.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/metadata/IgniteMdRowCount.java | 8 +
.../query/calcite/prepare/PlannerPhase.java | 6 +-
.../query/calcite/rel/IgniteTableModify.java | 56 +++++-
.../calcite/rule/AbstractIgniteConverterRule.java | 5 +-
.../calcite/rule/HashAggregateConverterRule.java | 21 +-
.../query/calcite/rule/ProjectConverterRule.java | 4 +-
.../rule/TableModifyDistributedConverterRule.java | 150 ++++++++++++++
...ava => TableModifySingleNodeConverterRule.java} | 14 +-
.../processors/query/calcite/type/OtherType.java | 11 ++
.../query/calcite/planner/AbstractPlannerTest.java | 186 +-----------------
.../query/calcite/planner/TableDmlPlannerTest.java | 215 +++++++++++++++++++++
.../query/calcite/planner/TestTable.java | 187 +++++++++++++++++-
12 files changed, 651 insertions(+), 212 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 59c81cb3694..756e4b4e0c6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -36,6 +36,7 @@ import org.apache.calcite.util.Util;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
@@ -149,4 +150,11 @@ public class IgniteMdRowCount extends RelMdRowCount {
public double getRowCount(IgniteLimit rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}
+
+ /**
+ * Estimation of row count for Table modify operator.
+ */
+ public double getRowCount(IgniteTableModify rel, RelMetadataQuery mq) {
+ return rel.estimateRowCount(mq);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index bbc5217773d..5fc738cbc91 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -64,7 +64,8 @@ import
org.apache.ignite.internal.processors.query.calcite.rule.SetOpConverterRu
import
org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
import
org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
import
org.apache.ignite.internal.processors.query.calcite.rule.TableFunctionScanConverterRule;
-import
org.apache.ignite.internal.processors.query.calcite.rule.TableModifyConverterRule;
+import
org.apache.ignite.internal.processors.query.calcite.rule.TableModifyDistributedConverterRule;
+import
org.apache.ignite.internal.processors.query.calcite.rule.TableModifySingleNodeConverterRule;
import
org.apache.ignite.internal.processors.query.calcite.rule.UncollectConverterRule;
import
org.apache.ignite.internal.processors.query.calcite.rule.UnionConverterRule;
import
org.apache.ignite.internal.processors.query.calcite.rule.ValuesConverterRule;
@@ -292,7 +293,8 @@ public enum PlannerPhase {
SetOpConverterRule.MAP_REDUCE_INTERSECT,
ProjectConverterRule.INSTANCE,
FilterConverterRule.INSTANCE,
- TableModifyConverterRule.INSTANCE,
+ TableModifySingleNodeConverterRule.INSTANCE,
+ TableModifyDistributedConverterRule.INSTANCE,
UnionConverterRule.INSTANCE,
SortConverterRule.INSTANCE,
TableFunctionScanConverterRule.INSTANCE
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
index 2ca7dfac1d3..7168caadc90 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
@@ -18,17 +18,25 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
+import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/** */
public class IgniteTableModify extends TableModify implements IgniteRel {
+ /** If table modify can affect data source. */
+ private final boolean affectsSrc;
+
/**
* Creates a {@code TableModify}.
*
@@ -43,8 +51,9 @@ public class IgniteTableModify extends TableModify implements
IgniteRel {
* @param input Sub-query or filter condition.
* @param operation Modify operation (INSERT, UPDATE, DELETE, MERGE).
* @param updateColumnList List of column identifiers to be updated (e.g.
ident1, ident2); null if not UPDATE.
- * @param sourceExpressionList List of value expressions to be set (e.g.
exp1, exp2); null if not UPDATE.
+ * @param srcExpressionList List of value expressions to be set (e.g.
exp1, exp2); null if not UPDATE.
* @param flattened Whether set flattens the input row type.
+ * @param affectsSrc If table modify can affect data source.
*/
public IgniteTableModify(
RelOptCluster cluster,
@@ -53,12 +62,15 @@ public class IgniteTableModify extends TableModify
implements IgniteRel {
RelNode input,
Operation operation,
List<String> updateColumnList,
- List<RexNode> sourceExpressionList,
- boolean flattened
+ List<RexNode> srcExpressionList,
+ boolean flattened,
+ boolean affectsSrc
) {
super(cluster, traitSet, table,
Commons.context(cluster).catalogReader(),
input, operation, updateColumnList,
- sourceExpressionList, flattened);
+ srcExpressionList, flattened);
+
+ this.affectsSrc = affectsSrc;
}
/**
@@ -75,7 +87,8 @@ public class IgniteTableModify extends TableModify implements
IgniteRel {
input.getEnum("operation", Operation.class),
input.getStringList("updateColumnList"),
input.getExpressionList("sourceExpressionList"),
- input.getBoolean("flattened", true)
+ input.getBoolean("flattened", true),
+ false // Field only for planning.
);
}
@@ -89,7 +102,8 @@ public class IgniteTableModify extends TableModify
implements IgniteRel {
getOperation(),
getUpdateColumnList(),
getSourceExpressionList(),
- isFlattened());
+ isFlattened(),
+ affectsSrc);
}
/** {@inheritDoc} */
@@ -100,6 +114,34 @@ public class IgniteTableModify extends TableModify
implements IgniteRel {
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel>
inputs) {
return new IgniteTableModify(cluster, getTraitSet(), getTable(),
sole(inputs),
- getOperation(), getUpdateColumnList(), getSourceExpressionList(),
isFlattened());
+ getOperation(), getUpdateColumnList(), getSourceExpressionList(),
isFlattened(), affectsSrc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double estimateRowCount(RelMetadataQuery mq) {
+ return 1.0D;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>>
deriveTraits(RelTraitSet childTraits, int childId) {
+ // Don't derive traits for single-node table modify.
+ if (TraitUtils.distribution(traitSet) == IgniteDistributions.single())
+ return null;
+
+ assert childId == 0;
+
+ if (childTraits.getConvention() != IgniteConvention.INSTANCE)
+ return null;
+
+ // If modify can affect data source (for example, INSERT contains self
table as source) only
+ // modified table affinity distibution is possible, otherwise
inconsistency is possible on remote nodes.
+ if (affectsSrc)
+ return null;
+
+ // Any distributed (random/hash) trait is accepted if data source is
not affected by modify.
+ if
(!TraitUtils.distribution(childTraits).satisfies(IgniteDistributions.random()))
+ return null;
+
+ return Pair.of(traitSet, ImmutableList.of(childTraits));
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
index edd656ab16e..935d4af371b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.jetbrains.annotations.Nullable;
/** */
public abstract class AbstractIgniteConverterRule<T extends RelNode> extends
ConverterRule {
@@ -39,7 +40,7 @@ public abstract class AbstractIgniteConverterRule<T extends
RelNode> extends Con
}
/** {@inheritDoc} */
- @Override public final RelNode convert(RelNode rel) {
+ @Override public final @Nullable RelNode convert(RelNode rel) {
return convert(rel.getCluster().getPlanner(),
rel.getCluster().getMetadataQuery(), (T)rel);
}
@@ -51,5 +52,5 @@ public abstract class AbstractIgniteConverterRule<T extends
RelNode> extends Con
* @param rel Rel node.
* @return Physical rel.
*/
- protected abstract PhysicalNode convert(RelOptPlanner planner,
RelMetadataQuery mq, T rel);
+ protected abstract @Nullable PhysicalNode convert(RelOptPlanner planner,
RelMetadataQuery mq, T rel);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
index 38f5b41fc14..65cca0e7759 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
@@ -19,10 +19,10 @@ package
org.apache.ignite.internal.processors.query.calcite.rule;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.hint.HintUtils;
@@ -31,16 +31,17 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteColocat
import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.jetbrains.annotations.Nullable;
/**
*
*/
public class HashAggregateConverterRule {
/** */
- public static final RelOptRule COLOCATED = new
ColocatedHashAggregateConverterRule();
+ public static final ConverterRule COLOCATED = new
ColocatedHashAggregateConverterRule();
/** */
- public static final RelOptRule MAP_REDUCE = new
MapReduceHashAggregateConverterRule();
+ public static final ConverterRule MAP_REDUCE = new
MapReduceHashAggregateConverterRule();
/** */
private HashAggregateConverterRule() {
@@ -55,8 +56,11 @@ public class HashAggregateConverterRule {
}
/** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner,
RelMetadataQuery mq,
- LogicalAggregate agg) {
+ @Override protected @Nullable PhysicalNode convert(
+ RelOptPlanner planner,
+ RelMetadataQuery mq,
+ LogicalAggregate agg
+ ) {
if (HintUtils.isExpandDistinctAggregate(agg))
return null;
@@ -84,8 +88,11 @@ public class HashAggregateConverterRule {
}
/** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner,
RelMetadataQuery mq,
- LogicalAggregate agg) {
+ @Override protected @Nullable PhysicalNode convert(
+ RelOptPlanner planner,
+ RelMetadataQuery mq,
+ LogicalAggregate agg
+ ) {
if (HintUtils.isExpandDistinctAggregate(agg))
return null;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
index 427398ef264..4be79e6ddbf 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
@@ -20,10 +20,10 @@ package
org.apache.ignite.internal.processors.query.calcite.rule;
import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -39,7 +39,7 @@ import
org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
*/
public class ProjectConverterRule extends
AbstractIgniteConverterRule<LogicalProject> {
/** */
- public static final RelOptRule INSTANCE = new ProjectConverterRule();
+ public static final ConverterRule INSTANCE = new ProjectConverterRule();
/** */
public ProjectConverterRule() {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java
new file mode 100644
index 00000000000..eaa3fa3db90
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends
AbstractIgniteConverterRule<LogicalTableModify> {
+ /** */
+ public static final RelOptRule INSTANCE = new
TableModifyDistributedConverterRule();
+
+ /**
+ * Creates a ConverterRule.
+ */
+ public TableModifyDistributedConverterRule() {
+ super(LogicalTableModify.class,
TableModifyDistributedConverterRule.class.getSimpleName());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected @Nullable PhysicalNode convert(
+ RelOptPlanner planner,
+ RelMetadataQuery mq,
+ LogicalTableModify rel
+ ) {
+ // If transaction is explicitly started it's only allowed to perform
table modify on initiator node.
+ if (Commons.queryTransactionVersion(planner.getContext()) != null)
+ return null;
+
+ RelDataType rowType = rel.getRowType();
+ IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+ IgniteDistribution inputDistribution = table.distribution();
+ boolean affectsSrc = false;
+
+ // Single distribution table modify is prefered in this case.
+ if (inputDistribution == IgniteDistributions.single())
+ return null;
+
+ RelOptCluster cluster = rel.getCluster();
+
+ switch (rel.getOperation()) {
+ case MERGE:
+ // Merge contains insert fields as well as update fields. It's
impossible to check input distribution
+ // over these two fields sets in common case. Only corner
cases can be implemented. Skip it for now.
+ return null;
+
+ case INSERT:
+ affectsSrc =
RelOptUtil.findTables(rel).contains(rel.getTable());
+
+ if (inputDistribution.getType() !=
RelDistribution.Type.HASH_DISTRIBUTED) {
+ if (affectsSrc)
+ return null;
+ else
+ inputDistribution =
IgniteDistributions.hash(ImmutableIntList.range(0,
+
table.getRowType(cluster.getTypeFactory()).getFieldCount()));
+ }
+
+ break;
+
+ case UPDATE:
+ if (inputDistribution.getType() !=
RelDistribution.Type.HASH_DISTRIBUTED)
+ inputDistribution =
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+ break;
+
+ case DELETE:
+ inputDistribution =
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown operation type: " +
rel.getOperation());
+ }
+
+ // Create distributed table modify.
+ RelTraitSet outputTraits =
cluster.traitSetOf(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.random())
+ .replace(RewindabilityTrait.ONE_WAY)
+ .replace(RelCollations.EMPTY);
+
+ RelTraitSet inputTraits = outputTraits.replace(inputDistribution);
+
+ RelNode input = convert(rel.getInput(), inputTraits);
+
+ RelNode tableModify = new IgniteTableModify(cluster, outputTraits,
rel.getTable(), input, rel.getOperation(),
+ rel.getUpdateColumnList(), rel.getSourceExpressionList(),
rel.isFlattened(), affectsSrc);
+
+ assert rowType.getFieldCount() == 1 : "Unexpected field count: " +
rowType.getFieldCount();
+
+ // Create aggregate to pass affected rows count to initiator node.
+ RelDataTypeField outFld = rowType.getFieldList().get(0);
+
+ RelBuilder relBuilder = relBuilderFactory.create(cluster, null);
+
+ relBuilder.push(tableModify);
+ relBuilder.aggregate(relBuilder.groupKey(),
+ relBuilder.aggregateCall(SqlStdOperatorTable.SUM0,
relBuilder.field(0)).as(outFld.getName()));
+
+ PhysicalNode agg =
(PhysicalNode)HashAggregateConverterRule.MAP_REDUCE.convert(relBuilder.build());
+
+ if (agg == null)
+ return null;
+
+ // Create cast to original data type, since SUM aggregate extends type
(i.e. sum(INT) -> BIGINT).
+ relBuilder.push(agg);
+ relBuilder.project(relBuilder.cast(relBuilder.fields().get(0),
outFld.getType().getSqlTypeName()));
+
+ return
(PhysicalNode)ProjectConverterRule.INSTANCE.convert(relBuilder.build());
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyConverterRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifySingleNodeConverterRule.java
similarity index 79%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyConverterRule.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifySingleNodeConverterRule.java
index 49fda24c0a5..e6ae0c3c88e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyConverterRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifySingleNodeConverterRule.java
@@ -32,17 +32,17 @@ import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
import
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
/**
- *
+ * Converts LogicalTableModify to single distribution IgniteTableModify
(perform table modify on initiator node).
*/
-public class TableModifyConverterRule extends
AbstractIgniteConverterRule<LogicalTableModify> {
+public class TableModifySingleNodeConverterRule extends
AbstractIgniteConverterRule<LogicalTableModify> {
/** */
- public static final RelOptRule INSTANCE = new TableModifyConverterRule();
+ public static final RelOptRule INSTANCE = new
TableModifySingleNodeConverterRule();
/**
* Creates a ConverterRule.
*/
- public TableModifyConverterRule() {
- super(LogicalTableModify.class, "TableModifyConverterRule");
+ public TableModifySingleNodeConverterRule() {
+ super(LogicalTableModify.class,
TableModifySingleNodeConverterRule.class.getSimpleName());
}
/** {@inheritDoc} */
@@ -54,7 +54,7 @@ public class TableModifyConverterRule extends
AbstractIgniteConverterRule<Logica
.replace(RelCollations.EMPTY);
RelNode input = convert(rel.getInput(), traits);
- return new IgniteTableModify(cluster, traits, rel.getTable(), input,
- rel.getOperation(), rel.getUpdateColumnList(),
rel.getSourceExpressionList(), rel.isFlattened());
+ return new IgniteTableModify(cluster, traits, rel.getTable(), input,
rel.getOperation(),
+ rel.getUpdateColumnList(), rel.getSourceExpressionList(),
rel.isFlattened(), false);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
index fbb0ce8600a..1a87a117ccd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.type;
import java.lang.reflect.Type;
+import org.jetbrains.annotations.Nullable;
/** OTHER SQL type for any value. */
public class OtherType extends IgniteCustomType {
@@ -35,4 +36,14 @@ public class OtherType extends IgniteCustomType {
@Override public Type storageType() {
return Object.class;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(@Nullable Object obj) {
+ // Digest is the same for built-in Calcite's OTHER type, make sure we
get instance of correct class during
+ // canonization.
+ if (obj == null || obj.getClass() != getClass())
+ return false;
+
+ return super.equals(obj);
+ }
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 4dd43a9c807..b0310c799ce 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -24,14 +24,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.function.BiFunction;
import java.util.function.Predicate;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptListener;
-import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
@@ -39,34 +36,21 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.ColumnStrategy;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql2rel.InitializerContext;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
-import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
@@ -84,15 +68,10 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
-import
org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
@@ -102,7 +81,6 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Before;
-import org.mockito.Mockito;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
import static
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
@@ -703,8 +681,9 @@ public abstract class AbstractPlannerTest extends
GridCommonAbstractTest {
if (!(fields[i + 1] instanceof Class) && !(fields[i + 1]
instanceof SqlTypeName))
throw new IllegalArgumentException("'fields[" + i + "]' should
be a class or a SqlTypeName");
- RelDataType type = fields[i + 1] instanceof Class ?
TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]) :
- TYPE_FACTORY.createSqlType((SqlTypeName)fields[i + 1]);
+ RelDataType type = fields[i + 1] instanceof Class
+ ? TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1])
+ :
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType((SqlTypeName)fields[i
+ 1]), true);
b.add((String)fields[i], type);
}
@@ -749,165 +728,6 @@ public abstract class AbstractPlannerTest extends
GridCommonAbstractTest {
.build();
}
- /** */
- static class TestTableDescriptor implements CacheTableDescriptor {
- /** */
- private final Supplier<IgniteDistribution> distributionSupp;
-
- /** */
- private final RelDataType rowType;
-
- /** */
- private final GridCacheContextInfo<?, ?> cacheInfo;
-
- /** */
- public TestTableDescriptor(Supplier<IgniteDistribution> distribution,
RelDataType rowType) {
- this.distributionSupp = distribution;
- this.rowType = rowType;
- cacheInfo = Mockito.mock(GridCacheContextInfo.class);
-
- CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class);
- Mockito.when(cfg.isEagerTtl()).thenReturn(true);
-
- Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST"));
- Mockito.when(cacheInfo.config()).thenReturn(cfg);
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheContextInfo cacheInfo() {
- return cacheInfo;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheContext cacheContext() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteDistribution distribution() {
- return distributionSupp.get();
- }
-
- /** {@inheritDoc} */
- @Override public ColocationGroup colocationGroup(MappingQueryContext
ctx) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public RelDataType rowType(IgniteTypeFactory factory,
ImmutableBitSet usedColumns) {
- return rowType;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean match(CacheDataRow row) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public <Row> Row toRow(ExecutionContext<Row> ectx,
CacheDataRow row, RowHandler.RowFactory<Row> factory,
- @Nullable ImmutableBitSet requiredColumns) throws
IgniteCheckedException {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public <Row> ModifyTuple toTuple(ExecutionContext<Row> ectx,
Row row, TableModify.Operation op,
- @Nullable Object arg) throws IgniteCheckedException {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public ColumnDescriptor columnDescriptor(String fieldName) {
- RelDataTypeField field = rowType.getField(fieldName, false, false);
- return new TestColumnDescriptor(field.getIndex(), fieldName);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ColumnDescriptor> columnDescriptors() {
- return Commons.transform(rowType.getFieldList(), f -> new
TestColumnDescriptor(f.getIndex(), f.getName()));
- }
-
- /** {@inheritDoc} */
- @Override public GridQueryTypeDescriptor typeDescription() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isGeneratedAlways(RelOptTable table, int
iColumn) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public ColumnStrategy generationStrategy(RelOptTable table,
int iColumn) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public RexNode newColumnDefaultValue(RelOptTable table, int
iColumn, InitializerContext context) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public BiFunction<InitializerContext, RelNode, RelNode>
postExpressionConversionHook() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public RexNode newAttributeInitializer(RelDataType type,
SqlFunction constructor, int iAttribute,
- List<RexNode> constructorArgs, InitializerContext context) {
- throw new AssertionError();
- }
- }
-
- /** */
- static class TestColumnDescriptor implements ColumnDescriptor {
- /** */
- private final int idx;
-
- /** */
- private final String name;
-
- /** */
- public TestColumnDescriptor(int idx, String name) {
- this.idx = idx;
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasDefaultValue() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public int fieldIndex() {
- return idx;
- }
-
- /** {@inheritDoc} */
- @Override public RelDataType logicalType(IgniteTypeFactory f) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public Class<?> storageType() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public Object defaultValue() {
- throw new AssertionError();
- }
- }
-
/** */
static class TestMessageServiceImpl extends MessageServiceImpl {
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
index c71d56a7357..1694b8d7773 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
@@ -20,14 +20,22 @@ package
org.apache.ignite.internal.processors.query.calcite.planner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Spool;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import
org.apache.ignite.internal.processors.query.calcite.rule.TableModifySingleNodeConverterRule;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
+import static org.apache.calcite.sql.type.SqlTypeName.OTHER;
import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -190,4 +198,211 @@ public class TableDmlPlannerTest extends
AbstractPlannerTest {
assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
schema, hasColumns("NAME", "NAME1"));
}
+
+ /** Tests that table modify can be executed on remote nodes. */
+ @Test
+ public void testDistributedTableModify() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("TEST_PART", IgniteDistributions.affinity(3, "test",
"hash"),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ).addIndex("VAL_IDX", 4),
+ createTable("TEST_PART2", IgniteDistributions.affinity(2, "test2",
"hash"),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ),
+ createTable("TEST_RND", IgniteDistributions.random(),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ),
+ createTable("TEST_REPL", IgniteDistributions.broadcast(),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ),
+ createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ )
+ );
+
+ // Check MERGE statement.
+
+ // Merge doesn't support distributed TableModify.
+ assertPlan("MERGE INTO test_part dst USING test_part src ON dst.id =
src.id " +
+ "WHEN MATCHED THEN UPDATE SET val = dst.val + 1 " +
+ "WHEN NOT MATCHED THEN INSERT (id, aff_id, val) VALUES (src.id,
src.aff_id, src.val)", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+ // Check INSERT statements.
+
+ // partitioned <- values (broadcast).
+ assertPlan("INSERT INTO test_part VALUES (?, ?, ?)", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+ // partitioned <- partitioned (same).
+ assertPlan("INSERT INTO test_part SELECT * FROM test_part", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTableSpool.class)
+ .and(input(isTableScan("TEST_PART")))))));
+
+ // partitioned <- partitioned (same, affinity key change).
+ assertPlan("INSERT INTO test_part SELECT id, aff_id + 1, val FROM
test_part", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteTableSpool.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isTableScan("TEST_PART"))))))));
+
+ // partitioned <- partitioned (another affinity).
+ assertPlan("INSERT INTO test_part SELECT * FROM test_part2", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_PART2")))));
+
+ // partitioned <- partitioned (another affinity, affinity key change).
+ assertPlan("INSERT INTO test_part SELECT id, aff_id + 1, val FROM
test_part2", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_PART2")))));
+
+ // partitioned <- random.
+ assertPlan("INSERT INTO test_part SELECT * FROM test_rnd", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_RND")))));
+
+ // partitioned <- broadcast.
+ assertPlan("INSERT INTO test_part SELECT * FROM test_repl", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isTableScan("TEST_REPL"))));
+
+ // partitioned <- broadcast (force distributed).
+ assertPlan("INSERT INTO test_part SELECT * FROM test_repl", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("TEST_REPL")))))),
+ TableModifySingleNodeConverterRule.class.getSimpleName()
+ );
+
+ // broadcast <- partitioned.
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_part", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_PART")))));
+
+ // broadcast <- random.
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_rnd", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_RND")))));
+
+ // broadcast <- broadcast.
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isTableScan("TEST_REPL2"))));
+
+ // broadcast <- broadcast (force distributed).
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("TEST_REPL2")))))),
+ TableModifySingleNodeConverterRule.class.getSimpleName()
+ );
+
+ // broadcast <- broadcast (same).
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteTableSpool.class)
+ .and(input(isTableScan("TEST_REPL"))))));
+
+ // broadcast <- broadcast (same, force distributed).
+ GridTestUtils.assertThrows(null, () -> {
+ physicalPlan("INSERT INTO test_repl SELECT * FROM test_repl",
schema,
+ TableModifySingleNodeConverterRule.class.getSimpleName());
+ }, IgniteException.class, ""
+ );
+
+ // random <- partitioned.
+ assertPlan("INSERT INTO test_rnd SELECT * FROM test_part", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_PART")))));
+
+ // random <- broadcast.
+ assertPlan("INSERT INTO test_rnd SELECT * FROM test_repl", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isTableScan("TEST_REPL"))));
+
+ // random <- broadcast (force distributed).
+ assertPlan("INSERT INTO test_rnd SELECT * FROM test_repl", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("TEST_REPL")))))),
+ TableModifySingleNodeConverterRule.class.getSimpleName()
+ );
+
+ // Check UPDATE statements.
+
+ // partitioned.
+ assertPlan("UPDATE test_part SET val = val + 1", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_PART")))));
+
+ // partitioned (change indexed column for index scan).
+ assertPlan("UPDATE test_part SET val = val + 1 WHERE val = 10", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTableSpool.class)
+ .and(input(isIndexScan("TEST_PART", "VAL_IDX")))))));
+
+ // broadcast.
+ assertPlan("UPDATE test_repl SET val = val + 1", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isTableScan("TEST_REPL"))));
+
+ // broadcast (force distributed).
+ assertPlan("UPDATE test_repl SET val = val + 1", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("TEST_REPL")))))),
+ TableModifySingleNodeConverterRule.class.getSimpleName()
+ );
+
+ // random.
+ assertPlan("UPDATE test_rnd SET val = val + 1", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_RND")))));
+
+ // Check DELETE statements.
+
+ // partitioned.
+ assertPlan("DELETE FROM test_part", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_PART")))));
+
+ // broadcast.
+ assertPlan("DELETE FROM test_repl WHERE val = 10", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isTableScan("TEST_REPL"))));
+
+ // broadcast (force distributed).
+ assertPlan("DELETE FROM test_repl WHERE val = 10", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("TEST_REPL")))))),
+ TableModifySingleNodeConverterRule.class.getSimpleName()
+ );
+
+ // random.
+ assertPlan("DELETE FROM test_rnd WHERE val = 10", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST_RND")))));
+ }
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
index 45bcb3a6d76..744c351ddf6 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -19,18 +19,21 @@ package
org.apache.ignite.internal.processors.query.calcite.planner;
import java.lang.reflect.Type;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -40,35 +43,48 @@ import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Wrapper;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.cache.query.index.IndexDefinition;
import org.apache.ignite.internal.cache.query.index.IndexName;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.client.ClientIndex;
import
org.apache.ignite.internal.cache.query.index.sorted.client.ClientIndexDefinition;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
import
org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import
org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteStatisticsImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
import static
org.apache.ignite.internal.processors.query.calcite.planner.AbstractPlannerTest.DEFAULT_SCHEMA;
/** */
-public class TestTable implements IgniteCacheTable {
+public class TestTable implements IgniteCacheTable, Wrapper {
/** */
private final String name;
@@ -109,7 +125,7 @@ public class TestTable implements IgniteCacheTable {
statistics = new IgniteStatisticsImpl(new
ObjectStatisticsImpl((long)rowCnt, Collections.emptyMap()));
this.name = name;
- desc = new AbstractPlannerTest.TestTableDescriptor(this::distribution,
type);
+ desc = new TestTableDescriptor(this::distribution, type);
}
/**
@@ -314,4 +330,171 @@ public class TestTable implements IgniteCacheTable {
@Override public void authorize(Operation op) {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public <C> C unwrap(Class<C> cls) {
+ if (cls.isInstance(this))
+ return cls.cast(this);
+
+ if (cls.isInstance(desc))
+ return cls.cast(desc);
+
+ return null;
+ }
+
+ /** */
+ static class TestTableDescriptor extends NullInitializerExpressionFactory
implements CacheTableDescriptor {
+ /** */
+ private final Supplier<IgniteDistribution> distributionSupp;
+
+ /** */
+ private final RelDataType rowType;
+
+ /** */
+ private final GridCacheContextInfo<?, ?> cacheInfo;
+
+ /** */
+ public TestTableDescriptor(Supplier<IgniteDistribution> distribution,
RelDataType rowType) {
+ distributionSupp = distribution;
+ this.rowType = rowType;
+ cacheInfo = Mockito.mock(GridCacheContextInfo.class);
+
+ CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class);
+ Mockito.when(cfg.isEagerTtl()).thenReturn(true);
+
+ Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST"));
+ Mockito.when(cacheInfo.config()).thenReturn(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheContextInfo<?, ?> cacheInfo() {
+ return cacheInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheContext<?, ?> cacheContext() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteDistribution distribution() {
+ return distributionSupp.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ColocationGroup colocationGroup(MappingQueryContext
ctx) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType rowType(IgniteTypeFactory factory,
@Nullable ImmutableBitSet usedColumns) {
+ if (usedColumns == null)
+ return rowType;
+ else {
+ RelDataTypeFactory.Builder b = new
RelDataTypeFactory.Builder(factory);
+
+ for (int i = usedColumns.nextSetBit(0); i != -1; i =
usedColumns.nextSetBit(i + 1))
+ b.add(rowType.getFieldList().get(i));
+
+ return b.build();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean match(CacheDataRow row) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Row toRow(ExecutionContext<Row> ectx,
CacheDataRow row, RowHandler.RowFactory<Row> factory,
+ @Nullable ImmutableBitSet requiredColumns) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> ModifyTuple toTuple(ExecutionContext<Row> ectx,
Row row, TableModify.Operation op,
+ @Nullable Object arg) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ColumnDescriptor columnDescriptor(String fieldName) {
+ RelDataTypeField field = rowType.getField(fieldName, false, false);
+ return new TestColumnDescriptor(field.getIndex(), fieldName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ColumnDescriptor> columnDescriptors() {
+ return Commons.transform(rowType.getFieldList(), f -> new
TestColumnDescriptor(f.getIndex(), f.getName()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridQueryTypeDescriptor typeDescription() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType insertRowType(IgniteTypeFactory factory) {
+ ImmutableBitSet.Builder bitSetBuilder = ImmutableBitSet.builder();
+
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ String fldName = rowType.getFieldList().get(i).getName();
+
+ if (!QueryUtils.KEY_FIELD_NAME.equals(fldName) &&
!QueryUtils.VAL_FIELD_NAME.equals(fldName))
+ bitSetBuilder.set(i);
+ }
+
+ return rowType(factory, bitSetBuilder.build());
+ }
+ }
+
+ /** */
+ static class TestColumnDescriptor implements ColumnDescriptor {
+ /** */
+ private final int idx;
+
+ /** */
+ private final String name;
+
+ /** */
+ public TestColumnDescriptor(int idx, String name) {
+ this.idx = idx;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasDefaultValue() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int fieldIndex() {
+ return idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType logicalType(IgniteTypeFactory f) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> storageType() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object defaultValue() {
+ throw new AssertionError();
+ }
+ }
}