This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e0c472d2116 IGNITE-26135 Sql. Support DELETE operation in optimized 
plans (#6378)
e0c472d2116 is described below

commit e0c472d211688a10f47a26891a200216d8b5cfbf
Author: korlov42 <[email protected]>
AuthorDate: Tue Aug 12 10:32:18 2025 +0300

    IGNITE-26135 Sql. Support DELETE operation in optimized plans (#6378)
---
 .../sql-reference/explain-operators-list.adoc      |   5 +-
 .../benchmark/AbstractMultiNodeBenchmark.java      |   4 +
 .../internal/benchmark/BulkDeleteBenchmark.java    | 364 +++++++++++++++++++++
 .../ignite/internal/sql/engine/ItDmlTest.java      |   5 +-
 .../sql/engine/ItSqlUsesKeyValueDeleteTest.java    | 114 +++++++
 ...tTest.java => ItSqlUsesKeyValueInsertTest.java} |   2 +-
 .../integrationTest/sql/group1/explain/modify.test |  27 --
 .../sql/group1/explain/specialized_operators.test  |  26 +-
 .../internal/sql/engine/exec/UpdatableTable.java   |  17 +
 .../sql/engine/exec/UpdatableTableImpl.java        |  11 +
 .../sql/engine/prepare/KeyValueModifyPlan.java     |  47 ++-
 .../internal/sql/engine/prepare/PlannerHelper.java |   2 +-
 .../internal/sql/engine/prepare/PlannerPhase.java  |   8 +-
 .../PartitionAwarenessMetadataExtractor.java       |   3 +-
 .../sql/engine/rel/IgniteKeyValueModify.java       |  23 +-
 .../rule/TableModifyToKeyValueDeleteRule.java      | 128 ++++++++
 ...e.java => TableModifyToKeyValueInsertRule.java} |  39 +--
 .../engine/rule/TableScanToKeyValueGetRule.java    |  50 +--
 .../engine/statistic/SqlStatisticManagerImpl.java  |   3 +-
 .../sql/engine/exec/DummyUpdatableTable.java       |   5 +
 .../sql/engine/framework/TestBuilders.java         |   5 +
 .../sql/engine/planner/AbstractPlannerTest.java    |   6 +-
 .../engine/planner/KeyValueModifyPlannerTest.java  |  15 +-
 .../planner/PrimaryKeyDeletePlannerTest.java       | 231 +++++++++++++
 .../PartitionAwarenessMetadataTest.java            |  31 +-
 25 files changed, 1066 insertions(+), 105 deletions(-)

diff --git a/docs/_docs/sql-reference/explain-operators-list.adoc 
b/docs/_docs/sql-reference/explain-operators-list.adoc
index b6fbe68b1ad..50f086856ce 100644
--- a/docs/_docs/sql-reference/explain-operators-list.adoc
+++ b/docs/_docs/sql-reference/explain-operators-list.adoc
@@ -371,9 +371,12 @@ Optimized operator which leverages Key-Value API in DML 
queries.
 Attributes:
 
 - `table`: Table being accessed.
-- `sourceExpression`: Source expressions used for row computations.
 - `type`: Type of data modification operation (e.g., INSERT, UPDATE, DELETE).
 - `est`: Estimated number of output rows.
+- `sourceExpression`: Source expressions used for row computations for INSERT 
operation.
+Optional.
+- `key`: Source expressions used for row computations for DELETE operation.
+Optional.
 - `fieldNames`: List of names of columns in produced rows.
 Optional.
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index eef5c815306..eee8e28be48 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -194,6 +194,10 @@ public class AbstractMultiNodeBenchmark {
         IgniteUtils.closeAll(igniteServers.stream().map(node -> 
node::shutdown));
     }
 
+    public IgniteImpl node(int idx) {
+        return unwrapIgniteImpl(igniteServers.get(idx).api());
+    }
+
     private void startCluster() throws Exception {
         if (remote) {
             throw new AssertionError("Can't start the cluster in remote mode");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/BulkDeleteBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/BulkDeleteBenchmark.java
new file mode 100644
index 00000000000..f956ba19cee
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/BulkDeleteBenchmark.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark for Deletion operation, comparing KV, JDBC and SQL APIs.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.SECONDS)
+public class BulkDeleteBenchmark extends AbstractMultiNodeBenchmark {
+    private static final int TABLE_SIZE = 30_000;
+
+    @Param({"1", "3"})
+    private static int clusterSize;
+
+    @Param({"32"})
+    private int partitionCount;
+
+    @Param({"1", "3"})
+    private int replicaCount;
+
+    /**
+     * Fills the table with data.
+     */
+    @Setup
+    public void setUp() throws IOException, ExecutionException, 
InterruptedException, TimeoutException {
+        int id = 0;
+
+        KeyValueView<Tuple, Tuple> keyValueView = 
publicIgnite.tables().table(TABLE_NAME).keyValueView();
+
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            Tuple t = Tuple.create();
+            for (int j = 1; j <= 10; j++) {
+                t.set("field" + j, FIELD_VAL);
+            }
+
+            keyValueView.put(null, Tuple.create().set("ycsb_key", id++), t);
+        }
+
+        List<CompletableFuture<?>> futs = new ArrayList<>();
+        for (int i = 0; i < clusterSize; i++) {
+            SqlStatisticManagerImpl statisticManager = 
(SqlStatisticManagerImpl) ((SqlQueryProcessor) node(i).queryEngine())
+                    .sqlStatisticManager();
+
+            statisticManager.forceUpdateAll();
+            futs.add(statisticManager.lastUpdateStatisticFuture());
+        }
+
+        CompletableFutures.allOf(futs).get(10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Benchmark for SQL Delete via embedded client.
+     */
+    @Benchmark
+    public void sqlPreparedDelete(SqlState state) {
+        for (int i = TABLE_SIZE - 1; i >= 0; i--) {
+            state.executeQuery(i);
+        }
+    }
+
+    /**
+     * Benchmark for SQL Delete via embedded client.
+     */
+    @Benchmark
+    public void sqlInlinedDelete(SqlState state) {
+        for (int i = TABLE_SIZE - 1; i >= 0; i--) {
+            state.executeInlinedQuery(i);
+        }
+    }
+
+    /**
+     * Benchmark for KV Delete via embedded client.
+     */
+    @Benchmark
+    public void kvDelete(KvState state) {
+        for (int i = TABLE_SIZE - 1; i >= 0; i--) {
+            state.executeQuery(i);
+        }
+    }
+
+    /**
+     * Benchmark for JDBC Delete.
+     */
+    @Benchmark
+    public void jdbcDelete(JdbcState state) throws SQLException {
+        for (int i = TABLE_SIZE - 1; i >= 0; i--) {
+            state.executeQuery(i);
+        }
+    }
+
+    /**
+     * Benchmark for SQL Delete via thin client.
+     */
+    @Benchmark
+    public void sqlThinDelete(SqlThinState state) {
+        for (int i = TABLE_SIZE - 1; i >= 0; i--) {
+            state.executeQuery(i);
+        }
+    }
+
+    /**
+     * Benchmark for KV Delete via thin client.
+     */
+    @Benchmark
+    public void kvThinDelete(KvThinState state) {
+        for (int i = TABLE_SIZE - 1; i >= 0; i--) {
+            state.executeQuery(i);
+        }
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*[.]" + BulkDeleteBenchmark.class.getSimpleName() + 
".*")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    /**
+     * Benchmark state for {@link #sqlPreparedDelete(SqlState)}.
+     *
+     * <p>Holds {@link Statement}.
+     */
+    @State(Scope.Benchmark)
+    public static class SqlState {
+        private Statement statement;
+        private IgniteSql sql;
+
+        /**
+         * Initializes session and statement.
+         */
+        @Setup
+        public void setUp() {
+            String queryStr = createDeleteStatement();
+
+            sql = publicIgnite.sql();
+            statement = sql.createStatement(queryStr);
+        }
+
+        void executeQuery(int key) {
+            try (ResultSet<?> rs = sql.execute(null, statement, key)) {
+                // NO-OP
+            }
+        }
+
+        void executeInlinedQuery(int key) {
+            try (ResultSet<?> rs = sql.execute(null, 
createDeleteStatement(key))) {
+                // NO-OP
+            }
+        }
+    }
+
+    /**
+     * Benchmark state for {@link #sqlThinDelete(SqlThinState)}.
+     *
+     * <p>Holds {@link IgniteClient} and {@link Statement}.
+     */
+    @State(Scope.Benchmark)
+    public static class SqlThinState {
+        private IgniteClient client;
+        private Statement statement;
+        private IgniteSql sql;
+
+        /**
+         * Initializes session and statement.
+         */
+        @Setup
+        public void setUp() {
+            String queryStr = createDeleteStatement();
+
+            String[] clientAddrs = getServerEndpoints(clusterSize);
+
+            client = IgniteClient.builder().addresses(clientAddrs).build();
+
+            sql = client.sql();
+
+            statement = sql.createStatement(queryStr);
+        }
+
+        /**
+         * Closes resources.
+         */
+        @TearDown
+        public void tearDown() throws Exception {
+            closeAll(client);
+        }
+
+        void executeQuery(int key) {
+            sql.execute(null, statement, key);
+        }
+    }
+
+    /**
+     * Benchmark state for {@link #jdbcDelete(JdbcState)}.
+     *
+     * <p>Holds {@link Connection} and {@link PreparedStatement}.
+     */
+    @State(Scope.Benchmark)
+    public static class JdbcState {
+        private Connection conn;
+
+        private PreparedStatement stmt;
+
+        /**
+         * Initializes connection and prepared statement.
+         */
+        @Setup
+        public void setUp() throws SQLException {
+            String queryStr = createDeleteStatement();
+
+            //noinspection CallToDriverManagerGetConnection
+            conn = 
DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1:10800/");
+
+            stmt = conn.prepareStatement(queryStr);
+        }
+
+        /**
+         * Closes resources.
+         */
+        @TearDown
+        public void tearDown() throws Exception {
+            closeAll(stmt, conn);
+        }
+
+        void executeQuery(int key) throws SQLException {
+            stmt.setInt(1, key);
+            stmt.executeUpdate();
+        }
+    }
+
+    /**
+     * Benchmark state for {@link #kvDelete(KvState)}.
+     *
+     * <p>Holds {@link Tuple} and {@link KeyValueView} for the table.
+     */
+    @State(Scope.Benchmark)
+    public static class KvState {
+        private final KeyValueView<Tuple, Tuple> kvView = 
publicIgnite.tables().table(TABLE_NAME).keyValueView();
+
+        void executeQuery(int key) {
+            kvView.remove(null, Tuple.create().set("ycsb_key", key));
+        }
+    }
+
+    /**
+     * Benchmark state for {@link #kvThinDelete(KvThinState)}.
+     *
+     * <p>Holds {@link Tuple}, {@link IgniteClient}, and {@link KeyValueView} 
for the table.
+     */
+    @State(Scope.Benchmark)
+    public static class KvThinState {
+        private IgniteClient client;
+        private KeyValueView<Tuple, Tuple> kvView;
+
+        /**
+         * Initializes the tuple.
+         */
+        @Setup
+        public void setUp() {
+            String[] clientAddrs = getServerEndpoints(clusterSize);
+
+            client = IgniteClient.builder().addresses(clientAddrs).build();
+
+            kvView = client.tables().table(TABLE_NAME).keyValueView();
+        }
+
+        @TearDown
+        public void tearDown() throws Exception {
+            closeAll(client);
+        }
+
+        void executeQuery(int key) {
+            kvView.remove(null, Tuple.create().set("ycsb_key", key));
+        }
+    }
+
+    private static String createDeleteStatement() {
+        String deleteQueryTemplate = "DELETE FROM {} WHERE ycsb_key = ?";
+
+        return format(deleteQueryTemplate, TABLE_NAME, "ycsb_key");
+    }
+
+    private static String createDeleteStatement(int key) {
+        String deleteQueryTemplate = "DELETE FROM {} WHERE ycsb_key = {}";
+
+        return format(deleteQueryTemplate, TABLE_NAME, "ycsb_key", key);
+    }
+
+    @Override
+    protected int nodes() {
+        return clusterSize;
+    }
+
+    @Override
+    protected int partitionCount() {
+        return partitionCount;
+    }
+
+    @Override
+    protected int replicaCount() {
+        return replicaCount;
+    }
+}
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 749a384ab0f..4e762ef92a2 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -1043,12 +1043,11 @@ public class ItDmlTest extends BaseSqlIntegrationTest {
                     .check();
         }
 
-        // although it's basically kv delete, such optimization is not 
supported
-        // at the moment, thus expected plan should contain IgniteTableModify
+        // kv delete
         for (int i = 0; i < tableSize; i++) {
             assertQuery("DELETE FROM test1 WHERE id1=? AND id2=?")
                     .withParams(i, i)
-                    .matches(containsSubPlan("TableModify"))
+                    .matches(containsSubPlan("KeyValueModify"))
                     .returns(1L)
                     .check();
         }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValueDeleteTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValueDeleteTest.java
new file mode 100644
index 00000000000..3664c79c38c
--- /dev/null
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValueDeleteTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to verify e2e cases of optimized lookup by primary key.
+ */
+public class ItSqlUsesKeyValueDeleteTest extends BaseSqlIntegrationTest {
+    private static final int TABLE_SIZE = 10;
+
+    @BeforeEach
+    @SuppressWarnings("ConcatenationWithEmptyString")
+    void initSchema() {
+        CLUSTER.aliveNode().sql().executeScript(""
+                + "DROP TABLE IF EXISTS simple_key;"
+                + "DROP TABLE IF EXISTS complex_key_normal_order;"
+                + "DROP TABLE IF EXISTS complex_key_revers_order;" 
+                + ""
+                + "CREATE TABLE simple_key (id INT PRIMARY KEY, val INT);"
+                + "CREATE TABLE complex_key_normal_order (id1 INT, id2 INT, 
val INT, PRIMARY KEY(id1, id2));"
+                + "CREATE TABLE complex_key_revers_order (id1 INT, id2 INT, 
val INT, PRIMARY KEY(id2, id1));"
+                + ""
+                + "INSERT INTO simple_key SELECT x, x FROM 
TABLE(system_range(1, ?));"
+                + "INSERT INTO complex_key_normal_order SELECT x, 2 * x, x 
FROM TABLE(system_range(1, ?));"
+                + "INSERT INTO complex_key_revers_order SELECT x, 2 * x, x 
FROM TABLE(system_range(1, ?));",
+                TABLE_SIZE, TABLE_SIZE, TABLE_SIZE
+        );
+    }
+
+    @RepeatedTest(3)
+    void deleteBySimpleKey() {
+        int key = randomKey();
+
+        assertQuery("DELETE FROM simple_key WHERE id = ?")
+                .matches(containsSubPlan("KeyValueModify"))
+                .withParams(key)
+                .returns(1L)
+                .check();
+    }
+
+    @RepeatedTest(3)
+    void deleteByComplexNormalKey() {
+        int key = randomKey();
+
+        assertQuery("DELETE FROM complex_key_normal_order WHERE id1 = ? AND 
id2 = ?")
+                .matches(containsSubPlan("KeyValueModify"))
+                .withParams(key, 2 * key)
+                .returns(1L)
+                .check();
+    }
+
+    @RepeatedTest(3)
+    void deleteByComplexReversedKey() {
+        int key = randomKey();
+
+        assertQuery("DELETE FROM complex_key_revers_order WHERE id1 = ? AND 
id2 = ?")
+                .matches(containsSubPlan("KeyValueModify"))
+                .withParams(key, 2 * key)
+                .returns(1L)
+                .check();
+    }
+
+    @Test
+    void lookupOnOutOfRangeKey() {
+        Transaction tx = CLUSTER.aliveNode().transactions().begin();
+
+        try {
+            sql(tx, "INSERT INTO simple_key VALUES (2147483647, 0), 
(-2147483648, 0);");
+
+            assertQuery((InternalTransaction) tx, "DELETE FROM simple_key 
WHERE id = 2147483648")
+                    .returns(0L)
+                    .check();
+
+            assertQuery((InternalTransaction) tx, "DELETE FROM simple_key 
WHERE id = -2147483649")
+                    .returns(0L)
+                    .check();
+        } finally {
+            tx.rollback();
+        }
+    }
+
+    private static int randomKey() {
+        int key = ThreadLocalRandom.current().nextInt(TABLE_SIZE) + 1;
+
+        System.out.println("Key is " + key);
+
+        return key;
+    }
+}
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValuePutTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValueInsertTest.java
similarity index 99%
rename from 
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValuePutTest.java
rename to 
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValueInsertTest.java
index 6a8fc5ba7de..5a636789c7a 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValuePutTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlUsesKeyValueInsertTest.java
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Test;
 /**
  * Tests to verify e2e cases of optimized insert.
  */
-public class ItSqlUsesKeyValuePutTest extends BaseSqlIntegrationTest {
+public class ItSqlUsesKeyValueInsertTest extends BaseSqlIntegrationTest {
     private static final int TABLE_SIZE = 10;
 
     @BeforeAll
diff --git 
a/modules/sql-engine/src/integrationTest/sql/group1/explain/modify.test 
b/modules/sql-engine/src/integrationTest/sql/group1/explain/modify.test
index 6382de12860..6620d04c852 100644
--- a/modules/sql-engine/src/integrationTest/sql/group1/explain/modify.test
+++ b/modules/sql-engine/src/integrationTest/sql/group1/explain/modify.test
@@ -82,33 +82,6 @@ Project
                   tuples: [[0]]
                   est: (rows=1)
 
-# delete by key simple
-explain plan
-DELETE FROM test_table WHERE c1 = 1
-----
-Project
-    fieldNames: [ROWCOUNT]
-    projection: [CAST($f0):BIGINT NOT NULL]
-    est: (rows=1)
-  ColocatedHashAggregate
-      fieldNames: [$f0]
-      group: []
-      aggregation: [$SUM0(ROWCOUNT)]
-      est: (rows=1)
-    Exchange
-        distribution: single
-        est: (rows=1)
-      TableModify
-          table: PUBLIC.TEST_TABLE
-          fieldNames: [ROWCOUNT]
-          type: DELETE
-          est: (rows=1)
-        TableScan
-            table: PUBLIC.TEST_TABLE
-            predicate: =(C1, 1)
-            fieldNames: [C1]
-            est: (rows=1)
-
 # delete by key complex
 explain plan
 DELETE FROM test_table WHERE c1 in (1, 2, 3)
diff --git 
a/modules/sql-engine/src/integrationTest/sql/group1/explain/specialized_operators.test
 
b/modules/sql-engine/src/integrationTest/sql/group1/explain/specialized_operators.test
index 4589d147113..14435fe41b2 100644
--- 
a/modules/sql-engine/src/integrationTest/sql/group1/explain/specialized_operators.test
+++ 
b/modules/sql-engine/src/integrationTest/sql/group1/explain/specialized_operators.test
@@ -73,7 +73,7 @@ KeyValueGet
     est: (rows=1)
 
 
-## Tests on KeyValueModify node
+## Tests on KeyValueModify#INSERT node
 
 # insert single tuple with literals
 explain plan
@@ -108,6 +108,30 @@ KeyValueModify
     type: INSERT
     est: (rows=1)
 
+## Tests on KeyValueModify#DELETE node
+
+# delete single tuple with literals
+explain plan
+DELETE FROM test_table WHERE c1 = 1
+----
+KeyValueModify
+    table: PUBLIC.TEST_TABLE
+    fieldNames: [ROWCOUNT]
+    type: DELETE
+    key: [1]
+    est: (rows=1)
+
+# delete single tuple by functional key expression
+explain plan
+DELETE FROM test_table WHERE c1 = EXTRACT(DAY FROM CURRENT_DATE)::INTEGER;
+----
+KeyValueModify
+    table: PUBLIC.TEST_TABLE
+    fieldNames: [ROWCOUNT]
+    type: DELETE
+    key: [CAST(EXTRACT(FLAG(DAY), CURRENT_DATE)):INTEGER NOT NULL]
+    est: (rows=1)
+
 ## Tests on SelectCount node
 
 # count all
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
index 79fee23edad..692fef24f6a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
@@ -82,6 +82,23 @@ public interface UpdatableTable {
             ColocationGroup colocationGroup
     );
 
+    /**
+     * Performs a delete by primary key.
+     *
+     * <p>Note: this operation may be performed on initiator node only since 
it requires an
+     * original transaction rather than attributes, and transaction is only 
available on
+     * initiator node.
+     *
+     * @param <RowT> A type of row.
+     * @param explicitTx Transaction to use to perform delete.
+     * @param ectx Execution context.
+     * @param key A key to delete.
+     * @return A future representing result of operation.
+     */
+    <RowT> CompletableFuture<Boolean> delete(
+            @Nullable InternalTransaction explicitTx, ExecutionContext<RowT> 
ectx, RowT key
+    );
+
     /**
      * Removes rows from the table if they are exactly the same as any of the 
specified rows.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index fe36e0acba1..b6e6f63b4b6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -65,6 +65,7 @@ import 
org.apache.ignite.internal.table.distributed.storage.RowBatch;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.SqlException;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite table implementation.
@@ -314,6 +315,16 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         return rowBatchByPartitionId;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> CompletableFuture<Boolean> delete(@Nullable 
InternalTransaction explicitTx, ExecutionContext<RowT> ectx, RowT key) {
+        assert explicitTx != null;
+
+        BinaryRowEx keyRow = rowConverter.toKeyRow(ectx, key);
+
+        return table.delete(keyRow, explicitTx);
+    }
+
     /** {@inheritDoc} */
     @Override
     public <RowT> CompletableFuture<?> deleteAll(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
index 0b3bee355dc..79d375286cb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.sql.engine.util.Cloner;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.IteratorToDataCursorAdapter;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.jetbrains.annotations.Nullable;
 
@@ -57,7 +59,7 @@ public class KeyValueModifyPlan implements ExplainablePlan, 
ExecutablePlan {
     @Nullable
     private final PartitionAwarenessMetadata partitionAwarenessMetadata;
 
-    private volatile InsertExecution<?> operation;
+    private volatile Performable<?> operation;
 
     KeyValueModifyPlan(
             PlanId id,
@@ -121,8 +123,8 @@ public class KeyValueModifyPlan implements ExplainablePlan, 
ExecutablePlan {
         return ExplainUtils.toString(clonedRoot);
     }
 
-    private <RowT> InsertExecution<RowT> operation(ExecutionContext<RowT> ctx, 
ExecutableTableRegistry tableRegistry) {
-        InsertExecution<RowT> operation = cast(this.operation);
+    private <RowT> Performable<RowT> operation(ExecutionContext<RowT> ctx, 
ExecutableTableRegistry tableRegistry) {
+        Performable<RowT> operation = cast(this.operation);
 
         if (operation != null) {
             return operation;
@@ -138,7 +140,16 @@ public class KeyValueModifyPlan implements 
ExplainablePlan, ExecutablePlan {
 
         UpdatableTable table = execTable.updatableTable();
 
-        operation = new InsertExecution<>(table, rowSupplier);
+        switch (modifyNode.operation()) {
+            case INSERT:
+                operation = new InsertExecution<>(table, rowSupplier);
+                break;
+            case DELETE:
+                operation = new DeleteExecution<>(table, rowSupplier);
+                break;
+            default:
+                throw new IgniteException(Common.INTERNAL_ERR, "Unsupported 
operation " + modifyNode.operation());
+        }
 
         this.operation = operation;
 
@@ -151,7 +162,7 @@ public class KeyValueModifyPlan implements ExplainablePlan, 
ExecutablePlan {
             InternalTransaction tx,
             ExecutableTableRegistry tableRegistry
     ) {
-        InsertExecution<RowT> operation = operation(ctx, tableRegistry);
+        Performable<RowT> operation = operation(ctx, tableRegistry);
 
         CompletableFuture<Iterator<InternalSqlRow>> result = 
operation.perform(ctx, tx);
 
@@ -163,7 +174,11 @@ public class KeyValueModifyPlan implements 
ExplainablePlan, ExecutablePlan {
         return modifyNode;
     }
 
-    private static class InsertExecution<RowT> {
+    private abstract static class Performable<RowT> {
+        abstract CompletableFuture<Iterator<InternalSqlRow>> 
perform(ExecutionContext<RowT> ctx, @Nullable InternalTransaction tx);
+    }
+
+    private static class InsertExecution<RowT> extends Performable<RowT> {
         private final UpdatableTable table;
         private final SqlRowProvider<RowT> rowSupplier;
 
@@ -175,12 +190,32 @@ public class KeyValueModifyPlan implements 
ExplainablePlan, ExecutablePlan {
             this.rowSupplier = rowSupplier;
         }
 
+        @Override
         CompletableFuture<Iterator<InternalSqlRow>> 
perform(ExecutionContext<RowT> ctx, InternalTransaction tx) {
             return table.insert(tx, ctx, rowSupplier.get(ctx))
                     .thenApply(none -> List.<InternalSqlRow>of(new 
InternalSqlRowSingleLong(1L)).iterator());
         }
     }
 
+    private static class DeleteExecution<RowT> extends Performable<RowT> {
+        private final UpdatableTable table;
+        private final SqlRowProvider<RowT> rowSupplier;
+
+        private DeleteExecution(
+                UpdatableTable table,
+                SqlRowProvider<RowT> rowSupplier
+        ) {
+            this.table = table;
+            this.rowSupplier = rowSupplier;
+        }
+
+        @Override
+        CompletableFuture<Iterator<InternalSqlRow>> 
perform(ExecutionContext<RowT> ctx, InternalTransaction tx) {
+            return table.delete(tx, ctx, rowSupplier.get(ctx))
+                    .thenApply(deleted -> List.<InternalSqlRow>of(new 
InternalSqlRowSingleLong(deleted ? 1L : 0L)).iterator());
+        }
+    }
+
     public int catalogVersion() {
         return catalogVersion;
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
index 1e5a93e237e..b084e9127da 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
@@ -342,7 +342,7 @@ public final class PlannerHelper {
                 planner.cluster(),
                 planner.cluster().traitSetOf(IgniteConvention.INSTANCE),
                 targetTable,
-                Operation.PUT,
+                Operation.INSERT,
                 expressions
         );
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index f78758e837b..f7fbe232936 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -66,7 +66,8 @@ import 
org.apache.ignite.internal.sql.engine.rule.SortMergeRule;
 import org.apache.ignite.internal.sql.engine.rule.SortRemoveRule;
 import 
org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.TableModifyConverterRule;
-import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
+import 
org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValueDeleteRule;
+import 
org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValueInsertRule;
 import org.apache.ignite.internal.sql.engine.rule.TableScanToKeyValueGetRule;
 import org.apache.ignite.internal.sql.engine.rule.UnionConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.ValuesConverterRule;
@@ -101,8 +102,9 @@ public enum PlannerPhase {
     HEP_TO_SIMPLE_KEY_VALUE_OPERATION(
             "Heuristic phase to convert relational tree to simple Key-Value 
operation",
             TableScanToKeyValueGetRule.INSTANCE,
-            TableModifyToKeyValuePutRule.PROJECT,
-            TableModifyToKeyValuePutRule.VALUES
+            TableModifyToKeyValueInsertRule.PROJECT,
+            TableModifyToKeyValueInsertRule.VALUES,
+            TableModifyToKeyValueDeleteRule.INSTANCE
     ) {
         /** {@inheritDoc} */
         @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
index e1a2eb88ff1..9495ba411c0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify.Operation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.jetbrains.annotations.Nullable;
 
@@ -80,7 +81,7 @@ public class PartitionAwarenessMetadataExtractor {
 
         List<RexNode> expressions = kv.expressions();
 
-        return buildMetadata(optTable, true, expressions, 
DirectTxMode.SUPPORTED_TRACKING_REQUIRED);
+        return buildMetadata(optTable, kv.operation() == Operation.INSERT, 
expressions, DirectTxMode.SUPPORTED_TRACKING_REQUIRED);
     }
 
     private static @Nullable PartitionAwarenessMetadata buildMetadata(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
index 67dfbcea1b6..ad76aee26e8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
@@ -56,11 +56,14 @@ public class IgniteKeyValueModify extends AbstractRelNode 
implements IgniteRel {
 
     /** Enumeration of supported modification operations. */
     public enum Operation {
-        PUT(TableModify.Operation.INSERT);
+        INSERT(SqlKind.INSERT, TableModify.Operation.INSERT),
+        DELETE(SqlKind.DELETE, TableModify.Operation.DELETE);
 
+        private final SqlKind kind;
         private final TableModify.Operation op;
 
-        Operation(TableModify.Operation op) {
+        Operation(SqlKind kind, TableModify.Operation op) {
+            this.kind = kind;
             this.op = op;
         }
     }
@@ -93,9 +96,12 @@ public class IgniteKeyValueModify extends AbstractRelNode 
implements IgniteRel {
         this.expressions = expressions;
     }
 
+    public Operation operation() {
+        return operation;
+    }
+
     @Override public RelDataType deriveRowType() {
-        return RelOptUtil.createDmlRowType(
-                SqlKind.INSERT, getCluster().getTypeFactory());
+        return RelOptUtil.createDmlRowType(operation.kind, 
getCluster().getTypeFactory());
     }
 
     /** {@inheritDoc} */
@@ -154,9 +160,16 @@ public class IgniteKeyValueModify extends AbstractRelNode 
implements IgniteRel {
 
     @Override
     public IgniteRelWriter explain(IgniteRelWriter writer) {
+        if (operation == Operation.DELETE) {
+            writer.addKeyExpression(expressions);
+        } else {
+            assert operation == Operation.INSERT : operation;
+
+            writer.addSourceExpressions(expressions);
+        }
+
         return writer
                 .addTable(table)
-                .addSourceExpressions(expressions)
                 .addModifyOperationType(operation.op);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValueDeleteRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValueDeleteRule.java
new file mode 100644
index 00000000000..0f56256e14c
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValueDeleteRule.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify.Operation;
+import 
org.apache.ignite.internal.sql.engine.rel.ProjectableFilterableTableScan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.Pair;
+import org.immutables.value.Value;
+
+/**
+ * Rule that converts {@link TableModify} representing DELETE operation with a 
determined source
+ * to a Key-Value DELETE operation.
+ *
+ * <p>Conversion will be successful if: <ol>
+ *     <li>there is condition</li>
+ *     <li>table has primary key index (i.e. there is an index for which 
{@link IgniteIndex#primaryKey()} returns {@code true})</li>
+ *     <li>condition covers all columns of primary key index</li>
+ *     <li>condition doesn't involve other columns</li>
+ *     <li>only single search key is derived from condition</li>
+ * </ol>
+ */
[email protected]
+public class TableModifyToKeyValueDeleteRule extends 
RelRule<TableModifyToKeyValueDeleteRule.Config> {
+    public static final RelOptRule INSTANCE = Config.INSTANCE.toRule();
+
+    private TableModifyToKeyValueDeleteRule(Config cfg) {
+        super(cfg);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        TableModify modify = call.rel(0);
+        TableScan scan = call.rel(1);
+
+        RelOptTable relTableToModify = modify.getTable();
+        RelOptTable relTableToScan = scan.getTable();
+
+        assert relTableToModify != null;
+        assert relTableToScan != null;
+
+        IgniteTable igniteTableToModify = 
relTableToModify.unwrap(IgniteTable.class);
+        IgniteTable igniteTableToScan = 
relTableToScan.unwrap(IgniteTable.class);
+        return igniteTableToModify != null
+                && igniteTableToScan != null
+                && igniteTableToModify.id() == igniteTableToScan.id();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        ProjectableFilterableTableScan scan = call.rel(1);
+
+        Pair<List<RexNode>, RexNode> expressionsAndPostFilter = 
TableScanToKeyValueGetRule.analyzeCondition(scan);
+
+        if (expressionsAndPostFilter == null || 
expressionsAndPostFilter.getFirst() == null) {
+            return;
+        }
+
+        // post-condition is not allowed.
+        if (expressionsAndPostFilter.getSecond() != null) {
+            return;
+        }
+
+        call.transformTo(
+                new IgniteKeyValueModify(
+                        scan.getCluster(),
+                        scan.getTraitSet()
+                                .replace(IgniteConvention.INSTANCE)
+                                .replace(IgniteDistributions.single()),
+                        scan.getTable(),
+                        Operation.DELETE,
+                        expressionsAndPostFilter.getFirst()
+                )
+        );
+    }
+
+    /**
+     * Configuration.
+     */
+    @SuppressWarnings({"ClassNameSameAsAncestorName", 
"InnerClassFieldHidesOuterClassField"})
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+        Config INSTANCE = ImmutableTableModifyToKeyValueDeleteRule.Config.of()
+                .withDescription("TableModifyToKeyValueDeleteRule")
+                .withOperandSupplier(o0 ->
+                        o0.operand(TableModify.class)
+                                .predicate(TableModify::isDelete)
+                                .oneInput(o1 ->
+                                        
o1.operand(ProjectableFilterableTableScan.class)
+                                                .predicate(scan -> 
scan.condition() != null)
+                                                .noInputs()))
+                .as(Config.class);
+
+        /** {@inheritDoc} */
+        @Override
+        default TableModifyToKeyValueDeleteRule toRule() {
+            return new TableModifyToKeyValueDeleteRule(this);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValueInsertRule.java
similarity index 80%
rename from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java
rename to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValueInsertRule.java
index b19e25eb970..1dda290c37d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValueInsertRule.java
@@ -21,7 +21,6 @@ import java.util.List;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelRule;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.core.Values;
@@ -37,39 +36,39 @@ import org.immutables.value.Value;
 
 /**
  * Rule that converts {@link TableModify} representing INSERT operation with a 
determined source
- * to a Key-Value PUT operation.
+ * to a Key-Value INSERT operation.
  *
  * <p>Note: at the moment, this rule support only single row insert.
  */
 @Value.Enclosing
-public class TableModifyToKeyValuePutRule extends 
RelRule<TableModifyToKeyValuePutRule.Config> {
+public class TableModifyToKeyValueInsertRule extends 
RelRule<TableModifyToKeyValueInsertRule.Config> {
     public static final RelOptRule VALUES = Config.VALUES.toRule();
     public static final RelOptRule PROJECT = Config.PROJECT.toRule();
 
-    private TableModifyToKeyValuePutRule(Config cfg) {
+    private TableModifyToKeyValueInsertRule(Config cfg) {
         super(cfg);
     }
 
     /** {@inheritDoc} */
     @Override
     public void onMatch(RelOptRuleCall call) {
-        List<RelNode> operands = call.getRelList();
+        int operandsCount = call.getRelList().size();
 
-        TableModify modify = cast(operands.get(0));
+        TableModify modify = call.rel(0);
 
         assert modify.getOperation() == TableModify.Operation.INSERT : 
modify.getOperation();
 
         List<RexNode> expressions;
-        if (operands.size() == 2) {
-            Values values = cast(operands.get(1));
+        if (operandsCount == 2) {
+            Values values = call.rel(1);
 
             assert values.getTuples().size() == 1 : "Expected exactly one 
tuple, but was " + values.getTuples().size();
 
             expressions = List.copyOf(values.getTuples().get(0));
         } else {
-            assert operands.size() == 3 : operands;
+            assert operandsCount == 3 : call.getRelList();
 
-            Values values = cast(operands.get(2));
+            Values values = call.rel(2);
 
             assert values.getTuples().size() == 1 : "Expected exactly one 
tuple, but was " + values.getTuples().size();
 
@@ -82,7 +81,7 @@ public class TableModifyToKeyValuePutRule extends 
RelRule<TableModifyToKeyValueP
                 }
             };
 
-            Project project = cast(operands.get(1));
+            Project project = call.rel(1);
 
             expressions = inputInliner.visitList(project.getProjects());
         }
@@ -94,24 +93,20 @@ public class TableModifyToKeyValuePutRule extends 
RelRule<TableModifyToKeyValueP
                                 .replace(IgniteConvention.INSTANCE)
                                 .replace(IgniteDistributions.single()),
                         modify.getTable(),
-                        Operation.PUT,
+                        Operation.INSERT,
                         expressions
                 )
         );
     }
 
-    private static <T extends RelNode> T cast(RelNode node) {
-        return (T) node;
-    }
-
     /**
      * Configuration.
      */
     @SuppressWarnings({"ClassNameSameAsAncestorName", 
"InnerClassFieldHidesOuterClassField"})
     @Value.Immutable
     public interface Config extends RelRule.Config {
-        Config VALUES = ImmutableTableModifyToKeyValuePutRule.Config.of()
-                .withDescription("TableModifyToKeyValuePutRule:VALUES")
+        Config VALUES = ImmutableTableModifyToKeyValueInsertRule.Config.of()
+                .withDescription("TableModifyToKeyValueInsertRule:VALUES")
                 .withOperandSupplier(o0 ->
                         o0.operand(TableModify.class)
                                 .predicate(TableModify::isInsert)
@@ -121,8 +116,8 @@ public class TableModifyToKeyValuePutRule extends 
RelRule<TableModifyToKeyValueP
                                                 .noInputs()))
                 .as(Config.class);
 
-        Config PROJECT = ImmutableTableModifyToKeyValuePutRule.Config.of()
-                .withDescription("TableModifyToKeyValuePutRule:PROJECT")
+        Config PROJECT = ImmutableTableModifyToKeyValueInsertRule.Config.of()
+                .withDescription("TableModifyToKeyValueInsertRule:PROJECT")
                 .withOperandSupplier(o0 ->
                         o0.operand(TableModify.class)
                                 .predicate(TableModify::isInsert)
@@ -135,8 +130,8 @@ public class TableModifyToKeyValuePutRule extends 
RelRule<TableModifyToKeyValueP
 
         /** {@inheritDoc} */
         @Override
-        default TableModifyToKeyValuePutRule toRule() {
-            return new TableModifyToKeyValuePutRule(this);
+        default TableModifyToKeyValueInsertRule toRule() {
+            return new TableModifyToKeyValueInsertRule(this);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
index 364f0ec6c47..893552b8023 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
@@ -41,12 +41,14 @@ import 
org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import 
org.apache.ignite.internal.sql.engine.rel.ProjectableFilterableTableScan;
 import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.RexUtils;
+import org.apache.ignite.internal.util.Pair;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
 
@@ -73,10 +75,34 @@ public class TableScanToKeyValueGetRule extends 
RelRule<TableScanToKeyValueGetRu
     public void onMatch(RelOptRuleCall call) {
         IgniteLogicalTableScan scan = cast(call.rel(0));
 
+        Pair<List<RexNode>, RexNode> expressionsAndPostFilter = 
analyzeCondition(scan);
+
+        if (expressionsAndPostFilter == null || 
expressionsAndPostFilter.getFirst() == null) {
+            return;
+        }
+
+        call.transformTo(
+                new IgniteKeyValueGet(
+                        scan.getCluster(),
+                        scan.getTraitSet()
+                                .replace(IgniteConvention.INSTANCE)
+                                .replace(IgniteDistributions.single()),
+                        scan.getTable(),
+                        scan.getHints(),
+                        expressionsAndPostFilter.getFirst(),
+                        scan.fieldNames(), 
+                        scan.projects(),
+                        expressionsAndPostFilter.getSecond(),
+                        scan.requiredColumns()
+                )
+        );
+    }
+
+    static @Nullable Pair<List<RexNode>, @Nullable RexNode> 
analyzeCondition(ProjectableFilterableTableScan scan) {
         List<SearchBounds> bounds = deriveSearchBounds(scan);
 
         if (nullOrEmpty(bounds)) {
-            return;
+            return null;
         }
 
         List<RexNode> expressions = new ArrayList<>(bounds.size());
@@ -90,7 +116,7 @@ public class TableScanToKeyValueGetRule extends 
RelRule<TableScanToKeyValueGetRu
             // iteration over a number of search keys are not supported yet,
             // thus we need to make sure only single key was derived
             if (!(bound instanceof ExactBounds)) {
-                return;
+                return null;
             }
 
             condition.remove(bound.condition());
@@ -98,7 +124,7 @@ public class TableScanToKeyValueGetRule extends 
RelRule<TableScanToKeyValueGetRu
         }
 
         if (nullOrEmpty(expressions)) {
-            return;
+            return null;
         }
 
         RexNode resultingCondition = RexUtil.composeConjunction(rexBuilder, 
condition);
@@ -106,24 +132,10 @@ public class TableScanToKeyValueGetRule extends 
RelRule<TableScanToKeyValueGetRu
             resultingCondition = null;
         }
 
-        call.transformTo(
-                new IgniteKeyValueGet(
-                        cluster,
-                        scan.getTraitSet()
-                                .replace(IgniteConvention.INSTANCE)
-                                .replace(IgniteDistributions.single()),
-                        scan.getTable(),
-                        scan.getHints(),
-                        expressions,
-                        scan.fieldNames(), 
-                        scan.projects(),
-                        resultingCondition,
-                        scan.requiredColumns()
-                )
-        );
+        return new Pair<>(expressions, resultingCondition);
     }
 
-    private static @Nullable List<SearchBounds> 
deriveSearchBounds(IgniteLogicalTableScan scan) {
+    private static @Nullable List<SearchBounds> 
deriveSearchBounds(ProjectableFilterableTableScan scan) {
         RexNode condition = scan.condition();
 
         if (condition == null) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 1e8b0d50fff..aa1f51e5361 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.catalog.CatalogService;
@@ -232,7 +231,7 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
      * Returns feature for the last run update statistics to have ability wait 
update statistics.
      */
     @TestOnly
-    public Future<Void> lastUpdateStatisticFuture() {
+    public CompletableFuture<Void> lastUpdateStatisticFuture() {
         return latestUpdateFut.get();
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DummyUpdatableTable.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DummyUpdatableTable.java
index 0697717bf54..a02b2747e5d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DummyUpdatableTable.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DummyUpdatableTable.java
@@ -49,6 +49,11 @@ class DummyUpdatableTable implements UpdatableTable {
         return null;
     }
 
+    @Override
+    public <RowT> CompletableFuture<Boolean> delete(@Nullable 
InternalTransaction explicitTx, ExecutionContext<RowT> ectx, RowT key) {
+        return new CompletableFuture<>();
+    }
+
     @Override
     public <RowT> CompletableFuture<?> deleteAll(ExecutionContext<RowT> ectx, 
List<RowT> rows,
             ColocationGroup colocationGroup) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index d2781f6bc9a..75965a34164 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -1957,6 +1957,11 @@ public class TestBuilders {
             return nullCompletedFuture();
         }
 
+        @Override
+        public <RowT> CompletableFuture<Boolean> delete(@Nullable 
InternalTransaction explicitTx, ExecutionContext<RowT> ectx, RowT key) {
+            return CompletableFuture.completedFuture(false);
+        }
+
         @Override
         public <RowT> CompletableFuture<?> deleteAll(ExecutionContext<RowT> 
ectx, List<RowT> rows, ColocationGroup colocationGroup) {
             return nullCompletedFuture();
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 6253cc923e0..d4679a92967 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -106,7 +106,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.rel.explain.ExplainUtils;
-import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
+import 
org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValueInsertRule;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -130,8 +130,8 @@ import org.jetbrains.annotations.Nullable;
  */
 public abstract class AbstractPlannerTest extends IgniteAbstractTest {
     protected static final String[] DISABLE_KEY_VALUE_MODIFY_RULES = {
-            TableModifyToKeyValuePutRule.VALUES.toString(),
-            TableModifyToKeyValuePutRule.PROJECT.toString(),
+            TableModifyToKeyValueInsertRule.VALUES.toString(),
+            TableModifyToKeyValueInsertRule.PROJECT.toString(),
     };
 
     protected static final IgniteTypeFactory TYPE_FACTORY = 
Commons.typeFactory();
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
index 746e04e9036..9db87d2f8cf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestCluster;
 import org.apache.ignite.internal.sql.engine.framework.TestNode;
 import org.apache.ignite.internal.sql.engine.prepare.KeyValueModifyPlan;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify.Operation;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -91,12 +92,11 @@ public class KeyValueModifyPlannerTest extends 
AbstractPlannerTest {
     void optimizedInsertUsedForLiterals(String insertStatement) {
         node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
 
-        {
-            QueryPlan plan = node.prepare(insertStatement);
+        QueryPlan plan = node.prepare(insertStatement);
 
-            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
-            assertExpressions((KeyValueModifyPlan) plan, "10", "20");
-        }
+        assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+        assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.INSERT));
+        assertExpressions((KeyValueModifyPlan) plan, "10", "20");
     }
 
     @Test
@@ -107,6 +107,7 @@ public class KeyValueModifyPlannerTest extends 
AbstractPlannerTest {
             QueryPlan plan = node.prepare("INSERT INTO test VALUES (?, ?)");
 
             assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.INSERT));
             assertExpressions((KeyValueModifyPlan) plan, "?0", "?1");
         }
 
@@ -114,6 +115,7 @@ public class KeyValueModifyPlannerTest extends 
AbstractPlannerTest {
             QueryPlan plan = node.prepare("INSERT INTO test(id, val) VALUES 
(?, ?)");
 
             assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.INSERT));
             assertExpressions((KeyValueModifyPlan) plan, "?0", "?1");
         }
 
@@ -121,6 +123,7 @@ public class KeyValueModifyPlannerTest extends 
AbstractPlannerTest {
             QueryPlan plan = node.prepare("INSERT INTO test(val, id) VALUES 
(?, ?)");
 
             assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.INSERT));
             assertExpressions((KeyValueModifyPlan) plan, "?1", "?0");
         }
     }
@@ -132,6 +135,7 @@ public class KeyValueModifyPlannerTest extends 
AbstractPlannerTest {
         QueryPlan plan = node.prepare("INSERT INTO test VALUES (?, 1, 
CAST(CURRENT_DATE as VARCHAR(128)))");
 
         assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+        assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.INSERT));
         assertExpressions((KeyValueModifyPlan) plan, "?0", "1", 
"CAST(CURRENT_DATE):VARCHAR(128) CHARACTER SET \"UTF-8\" NOT NULL");
     }
 
@@ -146,6 +150,7 @@ public class KeyValueModifyPlannerTest extends 
AbstractPlannerTest {
         QueryPlan plan = node.prepare(insertStatement);
 
         assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+        assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.INSERT));
         assertExpressions((KeyValueModifyPlan) plan, "1", "10", "_UTF-8'a'");
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrimaryKeyDeletePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrimaryKeyDeletePlannerTest.java
new file mode 100644
index 00000000000..13b64248dff
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrimaryKeyDeletePlannerTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.commands.DropTableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.KeyValueModifyPlan;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify.Operation;
+import org.apache.ignite.internal.sql.engine.rel.IgniteValues;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test cases to very KV delete optimized plans.
+ */
+public class PrimaryKeyDeletePlannerTest extends AbstractPlannerTest {
+    private static final String NODE_NAME = "N1";
+
+    private static final TestCluster CLUSTER = TestBuilders.cluster()
+            .nodes(NODE_NAME)
+            .build();
+
+    private final TestNode node = CLUSTER.node(NODE_NAME);
+
+    @BeforeAll
+    static void start() {
+        CLUSTER.start();
+    }
+
+    @AfterAll
+    static void stop() throws Exception {
+        CLUSTER.stop();
+    }
+
+    @AfterEach
+    void clearCatalog() {
+        int version = CLUSTER.catalogManager().latestCatalogVersion();
+
+        List<CatalogCommand> commands = new ArrayList<>();
+        for (CatalogTableDescriptor table : 
CLUSTER.catalogManager().catalog(version).tables()) {
+            commands.add(
+                    DropTableCommand.builder()
+                            .schemaName(SqlCommon.DEFAULT_SCHEMA_NAME)
+                            .tableName(table.name())
+                            .build()
+            );
+        }
+
+        await(CLUSTER.catalogManager().execute(commands));
+    }
+
+    @Test
+    void optimizedDeleteUsedForSingleOptionConditions() {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+        {
+            QueryPlan plan = node.prepare("DELETE FROM test WHERE id = 1");
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.DELETE));
+            assertKeyExpressions((KeyValueModifyPlan) plan, "1");
+        }
+
+        {
+            QueryPlan plan = node.prepare("DELETE FROM test WHERE id IN(1)");
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.DELETE));
+            assertKeyExpressions((KeyValueModifyPlan) plan, "1");
+        }
+    }
+
+    @Test
+    void optimizedDeleteNotUsedForConditionWithPostFiltration() {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+        QueryPlan plan = node.prepare("DELETE FROM test WHERE id = 1 AND val > 
10");
+
+        assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+    }
+
+    @Test
+    void optimizedDeleteNotUsedForRange() {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+        QueryPlan plan = node.prepare("DELETE FROM test WHERE id > 1");
+
+        assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+    }
+
+    @Test
+    void optimizedDeleteNotUsedForMultiBounds() {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+        {
+            QueryPlan plan = node.prepare("DELETE FROM test WHERE id = 1 OR id 
= 2");
+
+            assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+        }
+
+        {
+            QueryPlan plan = node.prepare("DELETE FROM test WHERE id IN (1, 
2)");
+
+            assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+        }
+    }
+
+    @Test
+    void optimizedDeleteNotUsedForPartiallyCoveredKey() {
+        node.initSchema("CREATE TABLE test (id1 INT, id2 INT, val INT, PRIMARY 
KEY (id1, id2))");
+
+        {
+            QueryPlan plan = node.prepare("DELETE FROM test WHERE id1 = 1");
+
+            assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+        }
+    }
+
+    @Test
+    void optimizedDeleteUsedWithComplexKeysNormalOrder() {
+        node.initSchema("CREATE TABLE test (id1 INT, id2 INT, val INT, PRIMARY 
KEY(id1, id2))");
+
+        QueryPlan plan = node.prepare("DELETE FROM test WHERE id1 = 1 AND id2 
IN (2)");
+
+        assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+        assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.DELETE));
+        assertKeyExpressions((KeyValueModifyPlan) plan, "1", "2");
+    }
+
+    @Test
+    void optimizedDeleteUsedWithComplexKeysReversOrder() {
+        node.initSchema("CREATE TABLE test (id1 INT, id2 INT, val INT, PRIMARY 
KEY(id2, id1))");
+
+        QueryPlan plan = node.prepare("DELETE FROM test WHERE id1 = 1 AND id2 
IN (2)");
+
+        assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+        assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.DELETE));
+        assertKeyExpressions((KeyValueModifyPlan) plan, "2", "1");
+    }
+
+    @Test
+    void optimizedDeleteWithOutOfRangeKey() {
+        node.initSchema("CREATE TABLE test (id TINYINT PRIMARY KEY, val INT)");
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-26158
+        // // Out of range: TINYINT_MAX + 1.
+        // {
+        //     QueryPlan plan = node.prepare("DELETE FROM test WHERE id = 
128");
+        // 
+        //     assertThat(plan, instanceOf(MultiStepPlan.class));
+        //     assertEmptyValuesNode((MultiStepPlan) plan);
+        // }
+        // 
+        // // Out of range: TINYINT_MIN - 1.
+        // {
+        //     QueryPlan plan = node.prepare("DELETE FROM test WHERE id = 
-129");
+        // 
+        //     assertThat(plan, instanceOf(MultiStepPlan.class));
+        //     assertEmptyValuesNode((MultiStepPlan) plan);
+        // }
+
+        // TINYINT_MAX
+        {
+            QueryPlan plan = node.prepare("DELETE FROM test WHERE id = 127");
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.DELETE));
+            assertKeyExpressions((KeyValueModifyPlan) plan, "127:TINYINT");
+        }
+
+        // TINYINT_MIN
+        {
+            QueryPlan plan = node.prepare("DELETE FROM test WHERE id = -128");
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertThat(((KeyValueModifyPlan) plan).getRel().operation(), 
equalTo(Operation.DELETE));
+            assertKeyExpressions((KeyValueModifyPlan) plan, "-128:TINYINT");
+        }
+    }
+
+    private static void assertEmptyValuesNode(MultiStepPlan plan) {
+        assertThat(plan.getRel(), instanceOf(IgniteValues.class));
+        assertThat(((IgniteValues) plan.getRel()).tuples, Matchers.empty());
+    }
+
+    private static void assertKeyExpressions(KeyValueModifyPlan plan, 
String... expectedExpressions) {
+        List<String> keyExpressions = (plan.getRel()).expressions().stream()
+                .map(RexNode::toString)
+                .collect(toList());
+
+        assertThat(
+                keyExpressions,
+                equalTo(List.of(expectedExpressions))
+        );
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
index 3093581ee8b..304b192f1ce 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
@@ -98,7 +98,6 @@ public class PartitionAwarenessMetadataTest extends 
BaseIgniteAbstractTest {
             "SELECT * FROM table(system_range(1, 10)) WHERE x = 1",
             "SELECT count(*) FROM t WHERE c1=?",
             "UPDATE t SET c2=1 WHERE c1=?",
-            "DELETE FROM t WHERE c1=?",
     })
     public void noMetadata(String query) {
         node.initSchema("CREATE TABLE t (c1 INT PRIMARY KEY, c2 INT)");
@@ -136,7 +135,12 @@ public class PartitionAwarenessMetadataTest extends 
BaseIgniteAbstractTest {
                 Arguments.of("INSERT INTO t VALUES(1+1, ?)", null),
                 Arguments.of("INSERT INTO t(c2, c1) VALUES(?, ?)", 
dynamicParamsTrackingRequired(1)),
                 Arguments.of("INSERT INTO t(c2, c1) VALUES(1, ?)", 
dynamicParamsTrackingRequired(0)),
-                Arguments.of("INSERT INTO t(c2, c1) VALUES(?, 1)", null)
+                Arguments.of("INSERT INTO t(c2, c1) VALUES(?, 1)", null),
+
+                // KV DELETE
+                Arguments.of("DELETE FROM t WHERE c1=?", 
dynamicParamsTrackingRequired(0)),
+                Arguments.of("DELETE FROM t WHERE c1=1", null),
+                Arguments.of("DELETE FROM t WHERE c1=1+1", null)
         );
     }
 
@@ -156,7 +160,6 @@ public class PartitionAwarenessMetadataTest extends 
BaseIgniteAbstractTest {
                 // KV GET
                 Arguments.of("SELECT * FROM t WHERE c3=? and c2=?", 
dynamicParams(0)),
                 Arguments.of("SELECT * FROM t WHERE c2=? and c3=?", 
dynamicParams(1)),
-                Arguments.of("SELECT * FROM t WHERE c3=? and c2=?", 
dynamicParams(0)),
                 Arguments.of("SELECT * FROM t WHERE c1=? and c2=? and c3=?", 
dynamicParams(2)),
                 Arguments.of("SELECT * FROM t WHERE c3=? and c1=? and c2=?", 
dynamicParams(0)),
                 Arguments.of("SELECT * FROM t WHERE c3=? and c2=1", 
dynamicParams(0)),
@@ -173,7 +176,15 @@ public class PartitionAwarenessMetadataTest extends 
BaseIgniteAbstractTest {
 
                 Arguments.of("INSERT INTO t (c1, c2, c3) VALUES (?, ?, 3)", 
null),
                 Arguments.of("INSERT INTO t (c1, c3, c2) VALUES (?, 3, ?)", 
null),
-                Arguments.of("INSERT INTO t (c3, c1, c2) VALUES (3, ?, ?)", 
null)
+                Arguments.of("INSERT INTO t (c3, c1, c2) VALUES (3, ?, ?)", 
null),
+
+                // KV DELETE
+                Arguments.of("SELECT * FROM t WHERE c3=? and c2=?", 
dynamicParams(0)),
+                Arguments.of("SELECT * FROM t WHERE c2=? and c3=?", 
dynamicParams(1)),
+                Arguments.of("SELECT * FROM t WHERE c3=? and c2=1", 
dynamicParams(0)),
+
+                Arguments.of("SELECT * FROM t WHERE c3=3", null),
+                Arguments.of("SELECT * FROM t WHERE c2=? and c3=3", null)
         );
     }
 
@@ -206,7 +217,17 @@ public class PartitionAwarenessMetadataTest extends 
BaseIgniteAbstractTest {
                 // KV PUT
                 Arguments.of("INSERT INTO t VALUES (?, ?, ?, ?)",  
dynamicParamsTrackingRequired(2, 0, 1)),
                 Arguments.of("INSERT INTO t (c3, c2, c4, c1) VALUES (?, ?, ?, 
?)", dynamicParamsTrackingRequired(0, 3, 1)),
-                Arguments.of("INSERT INTO t (c3, c2, c4, c1) VALUES (?, ?, 1, 
?)", dynamicParamsTrackingRequired(0, 2, 1))
+                Arguments.of("INSERT INTO t (c3, c2, c4, c1) VALUES (?, ?, 1, 
?)", dynamicParamsTrackingRequired(0, 2, 1)),
+
+                // KV DELETE
+                Arguments.of("DELETE FROM t WHERE c1=? and c2=? and c3=?", 
dynamicParamsTrackingRequired(2, 0, 1)),
+                Arguments.of("DELETE FROM t WHERE c3=? and c1=? and c2=?", 
dynamicParamsTrackingRequired(0, 1, 2)),
+                Arguments.of("DELETE FROM t WHERE c3=? and c2=? and c1=?", 
dynamicParamsTrackingRequired(0, 2, 1)),
+
+                Arguments.of("DELETE FROM t WHERE c1=1 and c2=? and c3=?", 
null),
+                Arguments.of("DELETE FROM t WHERE c1=? and c2=2 and c3=?", 
null),
+                Arguments.of("DELETE FROM t WHERE c1=? and c2=? and c3=3", 
null),
+                Arguments.of("DELETE FROM t WHERE c1=1 and c2=2 and c3=3", 
null)
         );
     }
 

Reply via email to