Vladsz83 commented on code in PR #12593:
URL: https://github.com/apache/ignite/pull/12593#discussion_r2646151254


##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java:
##########
@@ -704,7 +682,7 @@ protected static TestTable createTable(String name, int 
size, IgniteDistribution
                 throw new IllegalArgumentException("'fields[" + i + "]' should 
be a class or a SqlTypeName");
 
             RelDataType type = fields[i + 1] instanceof Class ? 
TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]) :
-                TYPE_FACTORY.createSqlType((SqlTypeName)fields[i + 1]);
+                
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType((SqlTypeName)fields[i
 + 1]), true);

Review Comment:
   Suggestion, lets split the lines like:
   ```
   RelDataType type = fields[i + 1] instanceof Class 
                   ? TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]) 
                   : 
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType((SqlTypeName)fields[i
 + 1]), true);



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())

Review Comment:
   Do we need any dedicated test for it?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,

Review Comment:
   Do we expect some primary keys will be met on current node for table `TEST` 
from table `TEST3`?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java:
##########
@@ -35,4 +36,14 @@ public OtherType(boolean nullable) {
     @Override public Type storageType() {
         return Object.class;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(@Nullable Object obj) {

Review Comment:
   Where does it fire?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java:
##########
@@ -314,4 +330,171 @@ public TestTable addIndex(String name, int... keys) {
     @Override public void authorize(Operation op) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public <C> C unwrap(Class<C> cls) {
+        if (cls.isInstance(this))
+            return cls.cast(this);
+
+        if (cls.isInstance(desc))
+            return cls.cast(desc);
+
+        return null;
+    }
+
+    /** */
+    static class TestTableDescriptor extends NullInitializerExpressionFactory 
implements CacheTableDescriptor {
+        /** */
+        private final Supplier<IgniteDistribution> distributionSupp;
+
+        /** */
+        private final RelDataType rowType;
+
+        /** */
+        private final GridCacheContextInfo<?, ?> cacheInfo;
+
+        /** */
+        public TestTableDescriptor(Supplier<IgniteDistribution> distribution, 
RelDataType rowType) {
+            distributionSupp = distribution;
+            this.rowType = rowType;
+            cacheInfo = Mockito.mock(GridCacheContextInfo.class);
+
+            CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class);
+            Mockito.when(cfg.isEagerTtl()).thenReturn(true);
+
+            Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST"));
+            Mockito.when(cacheInfo.config()).thenReturn(cfg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContextInfo<?, ?> cacheInfo() {
+            return cacheInfo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContext<?, ?> cacheContext() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteDistribution distribution() {
+            return distributionSupp.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColocationGroup colocationGroup(MappingQueryContext 
ctx) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType rowType(IgniteTypeFactory factory, 
ImmutableBitSet usedColumns) {

Review Comment:
   `usedColumns` might be `@Nullable`



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java:
##########
@@ -314,4 +330,171 @@ public TestTable addIndex(String name, int... keys) {
     @Override public void authorize(Operation op) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public <C> C unwrap(Class<C> cls) {
+        if (cls.isInstance(this))
+            return cls.cast(this);
+
+        if (cls.isInstance(desc))
+            return cls.cast(desc);
+
+        return null;
+    }
+
+    /** */
+    static class TestTableDescriptor extends NullInitializerExpressionFactory 
implements CacheTableDescriptor {
+        /** */
+        private final Supplier<IgniteDistribution> distributionSupp;
+
+        /** */
+        private final RelDataType rowType;
+
+        /** */
+        private final GridCacheContextInfo<?, ?> cacheInfo;
+
+        /** */
+        public TestTableDescriptor(Supplier<IgniteDistribution> distribution, 
RelDataType rowType) {
+            distributionSupp = distribution;
+            this.rowType = rowType;
+            cacheInfo = Mockito.mock(GridCacheContextInfo.class);
+
+            CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class);
+            Mockito.when(cfg.isEagerTtl()).thenReturn(true);
+
+            Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST"));
+            Mockito.when(cacheInfo.config()).thenReturn(cfg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContextInfo<?, ?> cacheInfo() {
+            return cacheInfo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContext<?, ?> cacheContext() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteDistribution distribution() {
+            return distributionSupp.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColocationGroup colocationGroup(MappingQueryContext 
ctx) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType rowType(IgniteTypeFactory factory, 
ImmutableBitSet usedColumns) {
+            if (usedColumns == null)
+                return rowType;
+            else {
+                RelDataTypeFactory.Builder b = new 
RelDataTypeFactory.Builder(factory);
+
+                for (int i = usedColumns.nextSetBit(0); i != -1; i = 
usedColumns.nextSetBit(i + 1))
+                    b.add(rowType.getFieldList().get(i));
+
+                return b.build();
+            }
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean match(CacheDataRow row) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> Row toRow(ExecutionContext<Row> ectx, 
CacheDataRow row, RowHandler.RowFactory<Row> factory,
+            @Nullable ImmutableBitSet requiredColumns) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> ModifyTuple toTuple(ExecutionContext<Row> ectx, 
Row row, TableModify.Operation op,
+            @Nullable Object arg) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColumnDescriptor columnDescriptor(String fieldName) {
+            RelDataTypeField field = rowType.getField(fieldName, false, false);
+            return new TestColumnDescriptor(field.getIndex(), fieldName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ColumnDescriptor> columnDescriptors() {
+            return Commons.transform(rowType.getFieldList(), f -> new 
TestColumnDescriptor(f.getIndex(), f.getName()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridQueryTypeDescriptor typeDescription() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType insertRowType(IgniteTypeFactory factory) {
+            ImmutableBitSet.Builder bitSetBuilder = ImmutableBitSet.builder();
+            for (int i = 0; i < rowType.getFieldCount(); i++) {

Review Comment:
   Let's add an empty line above.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:

Review Comment:
   Do we have tests for this case?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java:
##########
@@ -314,4 +330,171 @@ public TestTable addIndex(String name, int... keys) {
     @Override public void authorize(Operation op) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public <C> C unwrap(Class<C> cls) {
+        if (cls.isInstance(this))
+            return cls.cast(this);
+
+        if (cls.isInstance(desc))
+            return cls.cast(desc);
+
+        return null;
+    }
+
+    /** */
+    static class TestTableDescriptor extends NullInitializerExpressionFactory 
implements CacheTableDescriptor {
+        /** */
+        private final Supplier<IgniteDistribution> distributionSupp;
+
+        /** */
+        private final RelDataType rowType;
+
+        /** */
+        private final GridCacheContextInfo<?, ?> cacheInfo;
+
+        /** */
+        public TestTableDescriptor(Supplier<IgniteDistribution> distribution, 
RelDataType rowType) {
+            distributionSupp = distribution;
+            this.rowType = rowType;
+            cacheInfo = Mockito.mock(GridCacheContextInfo.class);
+
+            CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class);
+            Mockito.when(cfg.isEagerTtl()).thenReturn(true);
+
+            Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST"));
+            Mockito.when(cacheInfo.config()).thenReturn(cfg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContextInfo<?, ?> cacheInfo() {
+            return cacheInfo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContext<?, ?> cacheContext() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteDistribution distribution() {
+            return distributionSupp.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColocationGroup colocationGroup(MappingQueryContext 
ctx) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType rowType(IgniteTypeFactory factory, 
ImmutableBitSet usedColumns) {
+            if (usedColumns == null)
+                return rowType;
+            else {
+                RelDataTypeFactory.Builder b = new 
RelDataTypeFactory.Builder(factory);
+
+                for (int i = usedColumns.nextSetBit(0); i != -1; i = 
usedColumns.nextSetBit(i + 1))
+                    b.add(rowType.getFieldList().get(i));
+
+                return b.build();
+            }
+

Review Comment:
   empty line



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(

Review Comment:
   Trivial codestyle. Might be a sigle line. Up to you.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifySingleNodeConverterRule.java:
##########
@@ -32,17 +32,17 @@
 import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 
 /**
- *
+ * Converts LogicalTableModify to single distribution IgniteTableModify 
(perform table modify on initiator node).
  */
-public class TableModifyConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+public class TableModifySingleNodeConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
     /** */
-    public static final RelOptRule INSTANCE = new TableModifyConverterRule();
+    public static final RelOptRule INSTANCE = new 
TableModifySingleNodeConverterRule();
 
     /**
      * Creates a ConverterRule.
      */
-    public TableModifyConverterRule() {

Review Comment:
   Suggestion. `Creates TableModifySingleNodeConverterRule`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;

Review Comment:
   Do we need any dedicated test for it?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),

Review Comment:
   To make the test more clear, I guggest renamings if the tables:
   `TEST` -> `TEST_PART`
   `TEST2` -> `TEST_PART2`
   `TEST3` -> `TEST_RND`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields, it's 
impossible to check input distribution
+                // over these two fields sets in common case, only corner 
cases can be implemented, skip it for now.
+                return null;
+
+            case INSERT:
+                affectsSrc = 
RelOptUtil.findTables(rel).contains(rel.getTable());
+
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+                    if (affectsSrc)
+                        return null;
+                    else
+                        inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.range(0, rowType.getFieldCount()));
+                }
+
+                break;
+
+            case UPDATE:
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED)
+                    inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            case DELETE:
+                inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            default:
+                throw new IllegalStateException("Unknown operation type: " + 
rel.getOperation());
+        }
+
+        // Create distributed table modify.
+        RelBuilder relBuilder = relBuilderFactory.create(rel.getCluster(), 
null);

Review Comment:
   Minor coding. Can be placed below. Where it is used



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields, it's 
impossible to check input distribution

Review Comment:
   Trivial spelling. Lets fix the sentences. Use `.` instead of `,`.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java:
##########
@@ -55,8 +56,11 @@ private static class ColocatedHashAggregateConverterRule 
extends AbstractIgniteC
         }
 
         /** {@inheritDoc} */
-        @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq,
-            LogicalAggregate agg) {
+        @Override protected @Nullable PhysicalNode convert(

Review Comment:
   Trivial codestyle. Might be a sigle line. Up to you.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java:
##########
@@ -84,8 +88,11 @@ private static class MapReduceHashAggregateConverterRule 
extends AbstractIgniteC
         }
 
         /** {@inheritDoc} */
-        @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq,
-            LogicalAggregate agg) {
+        @Override protected @Nullable PhysicalNode convert(

Review Comment:
   Trivial codestyle. Might be a sigle line. Up to you.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields, it's 
impossible to check input distribution
+                // over these two fields sets in common case, only corner 
cases can be implemented, skip it for now.
+                return null;
+
+            case INSERT:
+                affectsSrc = 
RelOptUtil.findTables(rel).contains(rel.getTable());
+
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+                    if (affectsSrc)
+                        return null;
+                    else
+                        inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.range(0, rowType.getFieldCount()));
+                }
+
+                break;
+
+            case UPDATE:
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED)
+                    inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            case DELETE:
+                inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            default:
+                throw new IllegalStateException("Unknown operation type: " + 
rel.getOperation());
+        }
+
+        // Create distributed table modify.
+        RelBuilder relBuilder = relBuilderFactory.create(rel.getCluster(), 
null);
+
+        RelTraitSet outputTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE)
+            .replace(IgniteDistributions.random())

Review Comment:
   Why exaclty from `random()`?, not `hash()` ?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))

Review Comment:
   If table is the same and its distribution is `hash()`, why it is `random()` 
in the result?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,

Review Comment:
   Maybe the aggregation should be checked too.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();

Review Comment:
   Can be put far away below, near the builder.



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,

Review Comment:
   Do we need `values` - test for tables of the other distributions?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields, it's 
impossible to check input distribution
+                // over these two fields sets in common case, only corner 
cases can be implemented, skip it for now.
+                return null;
+
+            case INSERT:
+                affectsSrc = 
RelOptUtil.findTables(rel).contains(rel.getTable());
+
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+                    if (affectsSrc)
+                        return null;
+                    else
+                        inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.range(0, rowType.getFieldCount()));
+                }
+
+                break;
+
+            case UPDATE:
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED)
+                    inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            case DELETE:
+                inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            default:
+                throw new IllegalStateException("Unknown operation type: " + 
rel.getOperation());
+        }
+
+        // Create distributed table modify.
+        RelBuilder relBuilder = relBuilderFactory.create(rel.getCluster(), 
null);
+
+        RelTraitSet outputTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE)
+            .replace(IgniteDistributions.random())
+            .replace(RewindabilityTrait.ONE_WAY)
+            .replace(RelCollations.EMPTY);
+
+        RelTraitSet inputTraits = outputTraits.replace(inputDistribution);
+
+        RelNode input = convert(rel.getInput(), inputTraits);
+
+        RelNode tableModify = new IgniteTableModify(cluster, outputTraits, 
rel.getTable(), input, rel.getOperation(),
+            rel.getUpdateColumnList(), rel.getSourceExpressionList(), 
rel.isFlattened(), affectsSrc);
+
+        // Create aggregate to pass affected rows count to initiator node.
+        RelDataTypeField outFld = rowType.getFieldList().get(0);

Review Comment:
   Can the row type length appear > 1 ? An `assert` maybe



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields, it's 
impossible to check input distribution
+                // over these two fields sets in common case, only corner 
cases can be implemented, skip it for now.
+                return null;
+
+            case INSERT:
+                affectsSrc = 
RelOptUtil.findTables(rel).contains(rel.getTable());

Review Comment:
   if  `affectsSrc` is `true`, do we need to procced? 
`IgniteModifyTable#deriveTraits` returns `null` in this case.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields, it's 
impossible to check input distribution
+                // over these two fields sets in common case, only corner 
cases can be implemented, skip it for now.
+                return null;
+
+            case INSERT:
+                affectsSrc = 
RelOptUtil.findTables(rel).contains(rel.getTable());
+
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+                    if (affectsSrc)
+                        return null;
+                    else
+                        inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.range(0, rowType.getFieldCount()));
+                }
+
+                break;
+
+            case UPDATE:
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED)
+                    inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            case DELETE:
+                inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            default:
+                throw new IllegalStateException("Unknown operation type: " + 
rel.getOperation());
+        }
+
+        // Create distributed table modify.
+        RelBuilder relBuilder = relBuilderFactory.create(rel.getCluster(), 
null);
+
+        RelTraitSet outputTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE)
+            .replace(IgniteDistributions.random())
+            .replace(RewindabilityTrait.ONE_WAY)
+            .replace(RelCollations.EMPTY);
+
+        RelTraitSet inputTraits = outputTraits.replace(inputDistribution);
+
+        RelNode input = convert(rel.getInput(), inputTraits);
+
+        RelNode tableModify = new IgniteTableModify(cluster, outputTraits, 
rel.getTable(), input, rel.getOperation(),
+            rel.getUpdateColumnList(), rel.getSourceExpressionList(), 
rel.isFlattened(), affectsSrc);
+
+        // Create aggregate to pass affected rows count to initiator node.
+        RelDataTypeField outFld = rowType.getFieldList().get(0);
+
+        relBuilder.push(tableModify);
+        relBuilder.aggregate(relBuilder.groupKey(),
+            relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, 
relBuilder.field(0)).as(outFld.getName()));
+
+        PhysicalNode agg = 
(PhysicalNode)HashAggregateConverterRule.MAP_REDUCE.convert(relBuilder.build());

Review Comment:
   Why not colocated?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,

Review Comment:
   Will the `TableScan` always has the hash distribution? Do we need to check 
it?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,

Review Comment:
   Isn't it the same case like `aff_id + 1` above? Other affinities, other 
primaries. Will `ModifyNode` always map the values read on other nodes, primary 
and backup? Why not `single` then? Same for the query below



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // roadcast <- random.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // broadcast <- broadcast.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL2"))));
+
+        // broadcast <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL2")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- broadcast (same).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST_REPL"))))));
+
+        // broadcast <- broadcast (same, force distributed).

Review Comment:
   If we allow, can it duplicate data?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // roadcast <- random.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // broadcast <- broadcast.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL2"))));
+
+        // broadcast <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL2")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- broadcast (same).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST_REPL"))))));
+
+        // broadcast <- broadcast (same, force distributed).
+        GridTestUtils.assertThrows(null, () -> {
+                physicalPlan("INSERT INTO test_repl SELECT * FROM test_repl", 
schema,
+                    TableModifySingleNodeConverterRule.class.getSimpleName());
+            }, IgniteException.class, ""
+        );
+
+        // random <- partitioned.
+        assertPlan("INSERT INTO test3 SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // random <- broadcast.
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // random <- broadcast (force distributed).
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // Check UPDATE statements.
+
+        // partitioned.
+        assertPlan("UPDATE test SET val = val + 1", schema,

Review Comment:
   It looks really too short check. Maybe we need to search for 
`Excnanheg/Aggregates`.



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // roadcast <- random.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // broadcast <- broadcast.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL2"))));
+
+        // broadcast <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL2")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- broadcast (same).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST_REPL"))))));
+
+        // broadcast <- broadcast (same, force distributed).
+        GridTestUtils.assertThrows(null, () -> {
+                physicalPlan("INSERT INTO test_repl SELECT * FROM test_repl", 
schema,
+                    TableModifySingleNodeConverterRule.class.getSimpleName());
+            }, IgniteException.class, ""
+        );
+
+        // random <- partitioned.
+        assertPlan("INSERT INTO test3 SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // random <- broadcast.
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // random <- broadcast (force distributed).
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,

Review Comment:
   Why allow? Can't it duplicate or corrupt data? Should we assert-fail here 
like above?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // roadcast <- random.

Review Comment:
   `roadcast` -> `broadcast`



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // roadcast <- random.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // broadcast <- broadcast.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL2"))));
+
+        // broadcast <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL2")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- broadcast (same).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST_REPL"))))));
+
+        // broadcast <- broadcast (same, force distributed).
+        GridTestUtils.assertThrows(null, () -> {
+                physicalPlan("INSERT INTO test_repl SELECT * FROM test_repl", 
schema,
+                    TableModifySingleNodeConverterRule.class.getSimpleName());
+            }, IgniteException.class, ""
+        );
+
+        // random <- partitioned.
+        assertPlan("INSERT INTO test3 SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // random <- broadcast.
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // random <- broadcast (force distributed).
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // Check UPDATE statements.
+
+        // partitioned.
+        assertPlan("UPDATE test SET val = val + 1", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))

Review Comment:
   Why `random()`, not `hash()` ? 



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        RelOptCluster cluster = rel.getCluster();
+
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields, it's 
impossible to check input distribution
+                // over these two fields sets in common case, only corner 
cases can be implemented, skip it for now.
+                return null;
+
+            case INSERT:
+                affectsSrc = 
RelOptUtil.findTables(rel).contains(rel.getTable());
+
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+                    if (affectsSrc)
+                        return null;
+                    else
+                        inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.range(0, rowType.getFieldCount()));

Review Comment:
   Can `rowType.getFieldCount()` > 1? `0` and an `assert` may be.



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {

Review Comment:
   Do we need tests with filters (`WHERE `)



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {

Review Comment:
   What if underlying cache has a node filer? Do we need to consider it?



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST3", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test", 
schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2", 
schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // roadcast <- random.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test3", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST3")))));
+
+        // broadcast <- broadcast.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL2"))));
+
+        // broadcast <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL2")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- broadcast (same).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST_REPL"))))));
+
+        // broadcast <- broadcast (same, force distributed).
+        GridTestUtils.assertThrows(null, () -> {
+                physicalPlan("INSERT INTO test_repl SELECT * FROM test_repl", 
schema,
+                    TableModifySingleNodeConverterRule.class.getSimpleName());
+            }, IgniteException.class, ""
+        );
+
+        // random <- partitioned.
+        assertPlan("INSERT INTO test3 SELECT * FROM test", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // random <- broadcast.
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // random <- broadcast (force distributed).
+        assertPlan("INSERT INTO test3 SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // Check UPDATE statements.
+
+        // partitioned.
+        assertPlan("UPDATE test SET val = val + 1", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST")))));
+
+        // broadcast.
+        assertPlan("UPDATE test_repl SET val = val + 1", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // broadcast (force distributed).
+        assertPlan("UPDATE test_repl SET val = val + 1", schema,

Review Comment:
   Can it duplicate or corrupt data? Or `TrimExchange` prevents?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to