This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 88fcc92  IGNITE-12275: SQL: Moved non-SELECT/DML commands processing 
to separate class. This closes #6071.
88fcc92 is described below

commit 88fcc92243df12bff1d18742211062d5e1d7dee4
Author: devozerov <ppoze...@gmail.com>
AuthorDate: Sat Feb 9 18:02:37 2019 +0300

    IGNITE-12275: SQL: Moved non-SELECT/DML commands processing to separate 
class. This closes #6071.
---
 ...tementsProcessor.java => CommandProcessor.java} | 323 ++++++++++++++++----
 .../processors/query/h2/CommandResult.java         |  58 ++++
 .../processors/query/h2/IgniteH2Indexing.java      | 324 +++++----------------
 .../processors/query/h2/ParsingResult.java         |  45 ++-
 .../cache/index/H2DynamicTableSelfTest.java        |   4 +-
 5 files changed, 435 insertions(+), 319 deletions(-)

diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
similarity index 72%
rename from 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
rename to 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index 84dfea2..8c52296 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.h2.ddl;
+package org.apache.ignite.internal.processors.query.h2;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,28 +28,41 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.NestedTxMode;
 import org.apache.ignite.internal.processors.query.QueryEntityEx;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.SchemaManager;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
+import 
org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableAddColumn;
 import 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableDropColumn;
@@ -58,20 +71,26 @@ import 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateTable;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropTable;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
 import org.apache.ignite.internal.sql.command.SqlAlterUserCommand;
+import org.apache.ignite.internal.sql.command.SqlBeginTransactionCommand;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlCommand;
+import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateUserCommand;
 import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlDropUserCommand;
 import org.apache.ignite.internal.sql.command.SqlIndexColumn;
+import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand;
+import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.h2.command.Prepared;
 import org.h2.command.ddl.AlterTableAlterColumn;
@@ -82,20 +101,29 @@ import org.h2.command.ddl.DropTable;
 import org.h2.table.Column;
 import org.h2.value.DataType;
 import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
 import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.PARAM_WRAP_VALUE;
 
 /**
- * DDL statements processor.<p>
- * Contains higher level logic to handle operations as a whole and communicate 
with the client.
+ * Processor responsible for execution of all non-SELECT and non-DML commands.
  */
-public class DdlStatementsProcessor {
+public class CommandProcessor {
     /** Kernal context. */
-    private GridKernalContext ctx;
+    private final GridKernalContext ctx;
 
-    /** Indexing. */
-    private SchemaManager schemaMgr;
+    /** Schema manager. */
+    private final SchemaManager schemaMgr;
+
+    /** Running query manager. */
+    private final RunningQueryManager runningQryMgr;
+
+    /** Logger. */
+    private final IgniteLogger log;
 
     /** Is backward compatible handling of UUID through DDL enabled. */
     private static final boolean handleUuidAsByte =
@@ -107,27 +135,19 @@ public class DdlStatementsProcessor {
      * @param ctx Kernal context.
      * @param schemaMgr Schema manager.
      */
-    public DdlStatementsProcessor(GridKernalContext ctx, SchemaManager 
schemaMgr) {
+    public CommandProcessor(GridKernalContext ctx, SchemaManager schemaMgr, 
RunningQueryManager runningQryMgr) {
         this.ctx = ctx;
         this.schemaMgr = schemaMgr;
-    }
+        this.runningQryMgr = runningQryMgr;
 
-    /**
-     * Initialize message handlers and this' fields needed for further 
operation.
-     *
-     * @param ctx Kernal context.
-     * @param schemaMgr Schema manager.
-     */
-    public void start(final GridKernalContext ctx, SchemaManager schemaMgr) {
-        this.ctx = ctx;
-        this.schemaMgr = schemaMgr;
+        log = ctx.log(CommandProcessor.class);
     }
 
     /**
      * @param cmd Command.
      * @return {@code True} if this is supported DDL command.
      */
-    public static boolean isDdlCommand(SqlCommand cmd) {
+    private static boolean isDdl(SqlCommand cmd) {
         return cmd instanceof SqlCreateIndexCommand
             || cmd instanceof SqlDropIndexCommand
             || cmd instanceof SqlAlterTableCommand
@@ -137,13 +157,54 @@ public class DdlStatementsProcessor {
     }
 
     /**
+     * Execute command.
+     *
+     * @param qry Query.
+     * @param cmdNative Native command (if any).
+     * @param cmdH2 H2 command (if any).
+     * @param cliCtx Client context.
+     * @param qryId Running query ID.
+     * @return Result.
+     */
+    public CommandResult runCommand(SqlFieldsQuery qry, SqlCommand cmdNative, 
GridSqlStatement cmdH2,
+        @Nullable SqlClientContext cliCtx, Long qryId) throws 
IgniteCheckedException {
+        assert cmdNative != null || cmdH2 != null;
+
+        // Do execute.
+        FieldsQueryCursor<List<?>> res = H2Utils.zeroCursor();
+        boolean unregister = true;
+
+        if (cmdNative != null) {
+            assert cmdH2 == null;
+
+            if (isDdl(cmdNative))
+                runCommandNativeDdl(qry.getSql(), cmdNative);
+            else if (cmdNative instanceof SqlBulkLoadCommand) {
+                res = processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, 
qryId);
+
+                unregister = false;
+            }
+            else if (cmdNative instanceof SqlSetStreamingCommand)
+                processSetStreamingCommand((SqlSetStreamingCommand)cmdNative, 
cliCtx);
+            else
+                processTxCommand(cmdNative, qry);
+        }
+        else {
+            assert cmdH2 != null;
+
+            runCommandH2(qry.getSql(), cmdH2);
+        }
+
+        return new CommandResult(res, unregister);
+    }
+
+    /**
      * Run DDL statement.
      *
      * @param sql Original SQL.
      * @param cmd Command.
-     * @return Result.
      */
-    public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand 
cmd) {
+    private void runCommandNativeDdl(String sql, SqlCommand cmd) {
         IgniteInternalFuture fut = null;
 
         try {
@@ -260,8 +321,6 @@ public class DdlStatementsProcessor {
 
             if (fut != null)
                 fut.get();
-
-            return H2Utils.zeroCursor();
         }
         catch (SchemaOperationException e) {
             throw convert(e);
@@ -278,20 +337,17 @@ public class DdlStatementsProcessor {
      * Execute DDL statement.
      *
      * @param sql SQL.
-     * @param prepared Prepared.
-     * @return Cursor on query results.
+     * @param cmdH2 Command.
      */
     @SuppressWarnings({"unchecked"})
-    public FieldsQueryCursor<List<?>> runDdlStatement(String sql, Prepared 
prepared) {
+    private void runCommandH2(String sql, GridSqlStatement cmdH2) {
         IgniteInternalFuture fut = null;
 
         try {
             finishActiveTxIfNecessary();
 
-            GridSqlStatement stmt0 = new 
GridSqlQueryParser(false).parse(prepared);
-
-            if (stmt0 instanceof GridSqlCreateIndex) {
-                GridSqlCreateIndex cmd = (GridSqlCreateIndex)stmt0;
+            if (cmdH2 instanceof GridSqlCreateIndex) {
+                GridSqlCreateIndex cmd = (GridSqlCreateIndex)cmdH2;
 
                 isDdlOnSchemaSupported(cmd.schemaName());
 
@@ -329,8 +385,8 @@ public class DdlStatementsProcessor {
                 fut = ctx.query().dynamicIndexCreate(tbl.cacheName(), 
cmd.schemaName(), typeDesc.tableName(),
                     newIdx, cmd.ifNotExists(), 0);
             }
-            else if (stmt0 instanceof GridSqlDropIndex) {
-                GridSqlDropIndex cmd = (GridSqlDropIndex) stmt0;
+            else if (cmdH2 instanceof GridSqlDropIndex) {
+                GridSqlDropIndex cmd = (GridSqlDropIndex) cmdH2;
 
                 isDdlOnSchemaSupported(cmd.schemaName());
 
@@ -350,10 +406,10 @@ public class DdlStatementsProcessor {
                             cmd.indexName());
                 }
             }
-            else if (stmt0 instanceof GridSqlCreateTable) {
+            else if (cmdH2 instanceof GridSqlCreateTable) {
                 ctx.security().authorize(null, 
SecurityPermission.CACHE_CREATE, null);
 
-                GridSqlCreateTable cmd = (GridSqlCreateTable)stmt0;
+                GridSqlCreateTable cmd = (GridSqlCreateTable)cmdH2;
 
                 isDdlOnSchemaSupported(cmd.schemaName());
 
@@ -383,10 +439,10 @@ public class DdlStatementsProcessor {
                         cmd.writeSynchronizationMode(), cmd.backups(), 
cmd.ifNotExists(), cmd.encrypted());
                 }
             }
-            else if (stmt0 instanceof GridSqlDropTable) {
+            else if (cmdH2 instanceof GridSqlDropTable) {
                 ctx.security().authorize(null, 
SecurityPermission.CACHE_DESTROY, null);
 
-                GridSqlDropTable cmd = (GridSqlDropTable)stmt0;
+                GridSqlDropTable cmd = (GridSqlDropTable)cmdH2;
 
                 isDdlOnSchemaSupported(cmd.schemaName());
 
@@ -400,8 +456,8 @@ public class DdlStatementsProcessor {
                 else
                     ctx.query().dynamicTableDrop(tbl.cacheName(), 
cmd.tableName(), cmd.ifExists());
             }
-            else if (stmt0 instanceof GridSqlAlterTableAddColumn) {
-                GridSqlAlterTableAddColumn cmd = 
(GridSqlAlterTableAddColumn)stmt0;
+            else if (cmdH2 instanceof GridSqlAlterTableAddColumn) {
+                GridSqlAlterTableAddColumn cmd = 
(GridSqlAlterTableAddColumn)cmdH2;
 
                 isDdlOnSchemaSupported(cmd.schemaName());
 
@@ -455,8 +511,8 @@ public class DdlStatementsProcessor {
                     }
                 }
             }
-            else if (stmt0 instanceof GridSqlAlterTableDropColumn) {
-                GridSqlAlterTableDropColumn cmd = 
(GridSqlAlterTableDropColumn)stmt0;
+            else if (cmdH2 instanceof GridSqlAlterTableDropColumn) {
+                GridSqlAlterTableDropColumn cmd = 
(GridSqlAlterTableDropColumn)cmdH2;
 
                 isDdlOnSchemaSupported(cmd.schemaName());
 
@@ -519,13 +575,6 @@ public class DdlStatementsProcessor {
 
             if (fut != null)
                 fut.get();
-
-            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new 
QueryCursorImpl(Collections.singletonList
-                (Collections.singletonList(0L)), null, false);
-
-            resCur.fieldsMeta(UPDATE_RESULT_META);
-
-            return resCur;
         }
         catch (SchemaOperationException e) {
             U.error(null, "DDL operation failure", e);
@@ -742,7 +791,7 @@ public class DdlStatementsProcessor {
      * @param cmd Statement.
      * @return Whether {@code cmd} is a DDL statement we're able to handle.
      */
-    public static boolean isDdlStatement(Prepared cmd) {
+    public static boolean isCommand(Prepared cmd) {
         return cmd instanceof CreateIndex || cmd instanceof DropIndex || cmd 
instanceof CreateTable ||
             cmd instanceof DropTable || cmd instanceof AlterTableAlterColumn;
     }
@@ -765,4 +814,170 @@ public class DdlStatementsProcessor {
                 return DataType.getTypeClassName(type);
         }
     }
-}
+
+    /**
+     * Process transactional command.
+     * @param cmd Command.
+     * @param qry Query.
+     * @throws IgniteCheckedException if failed.
+     */
+    private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
+        throws IgniteCheckedException {
+        NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? 
((SqlFieldsQueryEx)qry).getNestedTxMode() :
+            NestedTxMode.DEFAULT;
+
+        GridNearTxLocal tx = tx(ctx);
+
+        if (cmd instanceof SqlBeginTransactionCommand) {
+            if (!mvccEnabled(ctx))
+                throw new IgniteSQLException("MVCC must be enabled in order to 
start transaction.",
+                    IgniteQueryErrorCode.MVCC_DISABLED);
+
+            if (tx != null) {
+                if (nestedTxMode == null)
+                    nestedTxMode = NestedTxMode.DEFAULT;
+
+                switch (nestedTxMode) {
+                    case COMMIT:
+                        doCommit(tx);
+
+                        txStart(ctx, qry.getTimeout());
+
+                        break;
+
+                    case IGNORE:
+                        log.warning("Transaction has already been started, 
ignoring BEGIN command.");
+
+                        break;
+
+                    case ERROR:
+                        throw new IgniteSQLException("Transaction has already 
been started.",
+                            IgniteQueryErrorCode.TRANSACTION_EXISTS);
+
+                    default:
+                        throw new IgniteSQLException("Unexpected nested 
transaction handling mode: " +
+                            nestedTxMode.name());
+                }
+            }
+            else
+                txStart(ctx, qry.getTimeout());
+        }
+        else if (cmd instanceof SqlCommitTransactionCommand) {
+            // Do nothing if there's no transaction.
+            if (tx != null)
+                doCommit(tx);
+        }
+        else {
+            assert cmd instanceof SqlRollbackTransactionCommand;
+
+            // Do nothing if there's no transaction.
+            if (tx != null)
+                doRollback(tx);
+        }
+    }
+
+    /**
+     * Commit and properly close transaction.
+     * @param tx Transaction.
+     * @throws IgniteCheckedException if failed.
+     */
+    private void doCommit(@NotNull GridNearTxLocal tx) throws 
IgniteCheckedException {
+        try {
+            tx.commit();
+        }
+        finally {
+            closeTx(tx);
+        }
+    }
+
+    /**
+     * Rollback and properly close transaction.
+     * @param tx Transaction.
+     * @throws IgniteCheckedException if failed.
+     */
+    public void doRollback(@NotNull GridNearTxLocal tx) throws 
IgniteCheckedException {
+        try {
+            tx.rollback();
+        }
+        finally {
+            closeTx(tx);
+        }
+    }
+
+    /**
+     * Properly close transaction.
+     * @param tx Transaction.
+     * @throws IgniteCheckedException if failed.
+     */
+    private void closeTx(@NotNull GridNearTxLocal tx) throws 
IgniteCheckedException {
+        try {
+            tx.close();
+        }
+        finally {
+            ctx.cache().context().tm().resetContext();
+        }
+    }
+
+    /**
+     * Process SET STREAMING command.
+     *
+     * @param cmd Command.
+     * @param cliCtx Client context.
+     */
+    private void processSetStreamingCommand(SqlSetStreamingCommand cmd,
+        @Nullable SqlClientContext cliCtx) {
+        if (cliCtx == null)
+            throw new IgniteSQLException("SET STREAMING command can only be 
executed from JDBC or ODBC driver.");
+
+        if (cmd.isTurnOn())
+            cliCtx.enableStreaming(
+                cmd.allowOverwrite(),
+                cmd.flushFrequency(),
+                cmd.perNodeBufferSize(),
+                cmd.perNodeParallelOperations(),
+                cmd.isOrdered()
+            );
+        else
+            cliCtx.disableStreaming();
+    }
+
+    /**
+     * Process bulk load COPY command.
+     *
+     * @param cmd The command.
+     * @param qryId Query id.
+     * @return The context (which is the result of the first request/response).
+     * @throws IgniteCheckedException If something failed.
+     */
+    private FieldsQueryCursor<List<?>> 
processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
+        throws IgniteCheckedException {
+        if (cmd.packetSize() == null)
+            cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
+
+        GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), 
cmd.tableName());
+
+        if (tbl == null) {
+            throw new IgniteSQLException("Table does not exist: " + 
cmd.tableName(),
+                IgniteQueryErrorCode.TABLE_NOT_FOUND);
+        }
+
+        H2Utils.checkAndStartNotStartedCache(ctx, tbl);
+
+        UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
+
+        IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new 
DmlBulkLoadDataConverter(plan);
+
+        IgniteDataStreamer<Object, Object> streamer = 
ctx.grid().dataStreamer(tbl.cacheName());
+
+        BulkLoadCacheWriter outputWriter = new 
BulkLoadStreamerWriter(streamer);
+
+        BulkLoadParser inputParser = 
BulkLoadParser.createParser(cmd.inputFormat());
+
+        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, 
dataConverter, outputWriter,
+            runningQryMgr, qryId);
+
+        BulkLoadAckClientParameters params = new 
BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
+
+        return new BulkLoadContextCursor(processor, params);
+    }
+}
\ No newline at end of file
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java
new file mode 100644
index 0000000..a025bb1
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+
+import java.util.List;
+
+/**
+ * Command execution result.
+ */
+public class CommandResult {
+    /** Cursor. */
+    private final FieldsQueryCursor<List<?>> cur;
+
+    /** Whether running query should be unregistered. */
+    private final boolean unregisterRunningQry;
+
+    /**
+     * Constructor.
+     *
+     * @param cur Cursor.
+     * @param unregisterRunningQry Whether running query should be 
unregistered.
+     */
+    public CommandResult(FieldsQueryCursor<List<?>> cur, boolean 
unregisterRunningQry) {
+        this.cur = cur;
+        this.unregisterRunningQry = unregisterRunningQry;
+    }
+
+    /**
+     * @return Cursor.
+     */
+    public FieldsQueryCursor<List<?>> cursor() {
+        return cur;
+    }
+
+    /**
+     * @return Whether running query should be unregistered.
+     */
+    public boolean unregisterRunningQuery() {
+        return unregisterRunningQry;
+    }
+}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index afdd9e5..f6867d8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -42,7 +42,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheServerNotFoundException;
-import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -54,11 +53,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
@@ -96,7 +90,6 @@ import 
org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.NestedTxMode;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryHistoryMetrics;
 import org.apache.ignite.internal.processors.query.QueryHistoryMetricsKey;
@@ -106,7 +99,6 @@ import 
org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
 import 
org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver;
-import 
org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter;
 import 
org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
 import 
org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateResultsIterator;
 import 
org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIterator;
@@ -125,7 +117,6 @@ import 
org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
 import 
org.apache.ignite.internal.processors.query.h2.database.io.H2MvccInnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO;
-import 
org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
@@ -165,7 +156,6 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
-import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
 import org.apache.ignite.internal.util.typedef.F;
@@ -194,7 +184,6 @@ import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
 import org.h2.table.IndexColumn;
 import org.h2.util.JdbcUtils;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.lang.Boolean.FALSE;
@@ -251,7 +240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         
Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
 
     /** Update plans cache. */
-    private final ConcurrentMap<H2CachedStatementKey, UpdatePlan> 
updatePlanCache =
+    private volatile ConcurrentMap<H2CachedStatementKey, UpdatePlan> 
updatePlanCache =
         new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
 
     /** Logger. */
@@ -285,8 +274,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** Query context registry. */
     private final QueryContextRegistry qryCtxRegistry = new 
QueryContextRegistry();
 
-    /** */
-    private DdlStatementsProcessor ddlProc;
+    /** Processor to execute commands which are neither SELECT, nor DML. */
+    private CommandProcessor cmdProc;
 
     /** Partition reservation manager. */
     private PartitionReservationManager partReservationMgr;
@@ -295,7 +284,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private PartitionExtractor partExtractor;
 
     /** Running query manager. */
-    private RunningQueryManager runningQueryMgr;
+    private RunningQueryManager runningQryMgr;
 
     /** */
     private volatile 
GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, 
H2TwoStepCachedQuery> twoStepCache =
@@ -414,7 +403,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         boolean ifTblExists, boolean ifColNotExists) throws 
IgniteCheckedException {
         schemaMgr.addColumn(schemaName, tblName, cols, ifTblExists, 
ifColNotExists);
 
-        clearCachedQueries();
+        clearPlanCache();
     }
 
     /** {@inheritDoc} */
@@ -422,7 +411,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         boolean ifColExists) throws IgniteCheckedException {
         schemaMgr.dropColumn(schemaName, tblName, cols, ifTblExists, 
ifColExists);
 
-        clearCachedQueries();
+        clearPlanCache();
     }
 
     /**
@@ -554,7 +543,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, 
new IgniteSingletonIterator<>(updResRow));
             }
-            else if (DdlStatementsProcessor.isDdlStatement(p)) {
+            else if (CommandProcessor.isCommand(p)) {
                 throw new IgniteSQLException("DDL statements are supported for 
the whole cluster only.",
                     IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
             }
@@ -823,7 +812,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     @SuppressWarnings({"unchecked", "Anonymous2MethodRef"})
     private long streamQuery0(String qry, String schemaName, 
IgniteDataStreamer streamer, PreparedStatement stmt,
         final Object[] args) throws IgniteCheckedException {
-        Long qryId = runningQueryMgr.register(qry, 
GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+        Long qryId = runningQryMgr.register(qry, 
GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
 
         boolean fail = false;
 
@@ -912,7 +901,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             throw e;
         }
         finally {
-            runningQueryMgr.unregister(qryId, fail);
+            runningQryMgr.unregister(qryId, fail);
         }
     }
 
@@ -1439,7 +1428,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
             SqlFieldsQuery newQry = 
cloneFieldsQuery(qry).setSql(parser.lastCommandSql());
 
-            return new ParsingResult(newQry, leadingCmd, 
parser.remainingSql());
+            return new ParsingResult(newQry, leadingCmd, null, 
parser.remainingSql());
         }
         catch (SqlStrictParseException e) {
             throw new IgniteSQLException(e.getMessage(), 
IgniteQueryErrorCode.PARSING, e);
@@ -1463,38 +1452,37 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
-     * Executes a query natively.
+     * Execute command.
      *
      * @param schemaName Schema name.
      * @param qry Query.
-     * @param cmd Parsed command corresponding to query.
-     * @param cliCtx Client context, or {@code null} if not applicable.
-     * @return Result cursors.
+     * @param cliCtx CLient context.
+     * @param cmdNative Command (native).
+     * @param cmdH2 Command (H2).
+     * @return Result.
      */
-    private List<FieldsQueryCursor<List<?>>> 
queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry,
-        SqlCommand cmd, @Nullable SqlClientContext cliCtx) {
-        boolean fail = false;
-        boolean unregister = true;
+    public FieldsQueryCursor<List<?>> executeCommand(
+        String schemaName,
+        SqlFieldsQuery qry,
+        @Nullable SqlClientContext cliCtx,
+        SqlCommand cmdNative,
+        GridSqlStatement cmdH2
+    ) {
+        if (qry.isLocal()) {
+            throw new IgniteSQLException("DDL statements are not supported for 
LOCAL caches",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+        }
 
         Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), 
qry.isLocal(), true);
 
-        try {
-            FieldsQueryCursor<List<?>> cur;
+        boolean fail = false;
 
-            if (DdlStatementsProcessor.isDdlCommand(cmd))
-                cur = ddlProc.runDdlStatement(qry.getSql(), cmd);
-            else if (cmd instanceof SqlBulkLoadCommand) {
-                // Query will be unregistered when cursor is closed.
-                unregister = false;
+        CommandResult res = null;
 
-                cur = processBulkLoadCommand((SqlBulkLoadCommand) cmd, qryId);
-            }
-            else if (cmd instanceof SqlSetStreamingCommand)
-                cur = processSetStreamingCommand((SqlSetStreamingCommand)cmd, 
cliCtx);
-            else
-                cur = processTxCommand(cmd, qry);
+        try {
+            res = cmdProc.runCommand(qry, cmdNative, cmdH2, cliCtx, qryId);
 
-            return Collections.singletonList(cur);
+            return res.cursor();
         }
         catch (IgniteCheckedException e) {
             fail = true;
@@ -1503,38 +1491,12 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 ", err=" + e.getMessage() + ']', e);
         }
         finally {
-            if (unregister || fail)
-                runningQueryMgr.unregister(qryId, fail);
+            if (fail || (res != null && res.unregisterRunningQuery()))
+                runningQryMgr.unregister(qryId, fail);
         }
     }
 
     /**
-     * Process SET STREAMING command.
-     *
-     * @param cmd Command.
-     * @param cliCtx Client context.
-     * @return Cursor.
-     */
-    private FieldsQueryCursor<List<?>> 
processSetStreamingCommand(SqlSetStreamingCommand cmd,
-        @Nullable SqlClientContext cliCtx) {
-        if (cliCtx == null)
-            throw new IgniteSQLException("SET STREAMING command can only be 
executed from JDBC or ODBC driver.");
-
-        if (cmd.isTurnOn())
-            cliCtx.enableStreaming(
-                cmd.allowOverwrite(),
-                cmd.flushFrequency(),
-                cmd.perNodeBufferSize(),
-                cmd.perNodeParallelOperations(),
-                cmd.isOrdered()
-            );
-        else
-            cliCtx.disableStreaming();
-
-        return H2Utils.zeroCursor();
-    }
-
-    /**
      * Check expected statement type (when it is set by JDBC) and given 
statement type.
      *
      * @param qry Query.
@@ -1548,151 +1510,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
     }
 
-    /**
-     * Process transactional command.
-     * @param cmd Command.
-     * @param qry Query.
-     * @throws IgniteCheckedException if failed.
-     */
-    private FieldsQueryCursor<List<?>> processTxCommand(SqlCommand cmd, 
SqlFieldsQuery qry)
-        throws IgniteCheckedException {
-        NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? 
((SqlFieldsQueryEx)qry).getNestedTxMode() :
-            NestedTxMode.DEFAULT;
-
-        GridNearTxLocal tx = tx(ctx);
-
-        if (cmd instanceof SqlBeginTransactionCommand) {
-            if (!mvccEnabled(ctx))
-                throw new IgniteSQLException("MVCC must be enabled in order to 
start transaction.",
-                    IgniteQueryErrorCode.MVCC_DISABLED);
-
-            if (tx != null) {
-                if (nestedTxMode == null)
-                    nestedTxMode = NestedTxMode.DEFAULT;
-
-                switch (nestedTxMode) {
-                    case COMMIT:
-                        doCommit(tx);
-
-                        txStart(ctx, qry.getTimeout());
-
-                        break;
-
-                    case IGNORE:
-                        log.warning("Transaction has already been started, 
ignoring BEGIN command.");
-
-                        break;
-
-                    case ERROR:
-                        throw new IgniteSQLException("Transaction has already 
been started.",
-                            IgniteQueryErrorCode.TRANSACTION_EXISTS);
-
-                    default:
-                        throw new IgniteSQLException("Unexpected nested 
transaction handling mode: " +
-                            nestedTxMode.name());
-                }
-            }
-            else
-                txStart(ctx, qry.getTimeout());
-        }
-        else if (cmd instanceof SqlCommitTransactionCommand) {
-            // Do nothing if there's no transaction.
-            if (tx != null)
-                doCommit(tx);
-        }
-        else {
-            assert cmd instanceof SqlRollbackTransactionCommand;
-
-            // Do nothing if there's no transaction.
-            if (tx != null)
-                doRollback(tx);
-        }
-
-        return H2Utils.zeroCursor();
-    }
-
-    /**
-     * Commit and properly close transaction.
-     * @param tx Transaction.
-     * @throws IgniteCheckedException if failed.
-     */
-    private void doCommit(@NotNull GridNearTxLocal tx) throws 
IgniteCheckedException {
-        try {
-            tx.commit();
-        }
-        finally {
-            closeTx(tx);
-        }
-    }
-
-    /**
-     * Rollback and properly close transaction.
-     * @param tx Transaction.
-     * @throws IgniteCheckedException if failed.
-     */
-    private void doRollback(@NotNull GridNearTxLocal tx) throws 
IgniteCheckedException {
-        try {
-            tx.rollback();
-        }
-        finally {
-            closeTx(tx);
-        }
-    }
-
-    /**
-     * Properly close transaction.
-     * @param tx Transaction.
-     * @throws IgniteCheckedException if failed.
-     */
-    private void closeTx(@NotNull GridNearTxLocal tx) throws 
IgniteCheckedException {
-        try {
-            tx.close();
-        }
-        finally {
-            ctx.cache().context().tm().resetContext();
-        }
-    }
-
-    /**
-     * Process bulk load COPY command.
-     *
-     * @param cmd The command.
-     * @param qryId Query id.
-     * @return The context (which is the result of the first request/response).
-     * @throws IgniteCheckedException If something failed.
-     */
-    private FieldsQueryCursor<List<?>> 
processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
-        throws IgniteCheckedException {
-        if (cmd.packetSize() == null)
-            cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
-
-        GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), 
cmd.tableName());
-
-        if (tbl == null) {
-            throw new IgniteSQLException("Table does not exist: " + 
cmd.tableName(),
-                IgniteQueryErrorCode.TABLE_NOT_FOUND);
-        }
-
-        H2Utils.checkAndStartNotStartedCache(ctx, tbl);
-
-        UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
-
-        IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new 
DmlBulkLoadDataConverter(plan);
-
-        IgniteDataStreamer<Object, Object> streamer = 
ctx.grid().dataStreamer(tbl.cacheName());
-
-        BulkLoadCacheWriter outputWriter = new 
BulkLoadStreamerWriter(streamer);
-
-        BulkLoadParser inputParser = 
BulkLoadParser.createParser(cmd.inputFormat());
-
-        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, 
dataConverter, outputWriter,
-            runningQueryMgr, qryId);
-
-        BulkLoadAckClientParameters params = new 
BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
-
-        return new BulkLoadContextCursor(processor, params);
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings({"StringEquality", "unchecked"})
     @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String 
schemaName, SqlFieldsQuery qry,
@@ -1712,8 +1529,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
                 ParsingResult parseRes = parse(schemaName, remainingQry, 
firstArg);
 
-                SqlCommand nativeCmd = parseRes.nativeCommand();
-
                 remainingSql = parseRes.remainingSql();
 
                 if (remainingSql != null && failOnMultipleStmts)
@@ -1723,6 +1538,9 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
                 assert newQry.getSql() != null;
 
+                // Check if operation is performed on an active cluster.
+                SqlCommand nativeCmd = parseRes.commandNative();
+
                 if (!(nativeCmd instanceof SqlCommitTransactionCommand || 
nativeCmd instanceof SqlRollbackTransactionCommand)
                     && !ctx.state().publicApiActiveState(true)) {
                     throw new IgniteException("Can not perform the operation 
because the cluster is inactive. Note, " +
@@ -1730,9 +1548,20 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                         "let all the nodes join the cluster. To activate the 
cluster call Ignite.active(true).");
                 }
 
-                if (nativeCmd != null)
-                    res.addAll(queryDistributedSqlFieldsNative(schemaName, 
newQry, nativeCmd, cliCtx));
-                else  {
+                if (parseRes.isCommand()) {
+                    // Execute command.
+                    FieldsQueryCursor<List<?>> cmdRes = executeCommand(
+                        schemaName,
+                        newQry,
+                        cliCtx,
+                        parseRes.commandNative(),
+                        parseRes.commandH2()
+                    );
+
+                    res.add(cmdRes);
+                }
+                else {
+                    // Execute query or DML.
                     List<GridQueryFieldMetadata> meta = parseRes.meta();
 
                     GridCacheTwoStepQuery twoStepQry = parseRes.twoStepQuery();
@@ -1861,17 +1690,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     }
                 }
 
-                if (DdlStatementsProcessor.isDdlStatement(prepared)) {
-                    if (loc) {
-                        fail = true;
-
-                        throw new IgniteSQLException("DDL statements are not 
supported for LOCAL caches",
-                            IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-                    }
-
-                    return 
Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared));
-                }
-
                 if (prepared instanceof NoOperation)
                     return Collections.singletonList(H2Utils.zeroCursor());
 
@@ -1881,7 +1699,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
             }
             finally {
-                runningQueryMgr.unregister(qryId, fail);
+                runningQryMgr.unregister(qryId, fail);
             }
         }
 
@@ -1905,7 +1723,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             return Collections.singletonList(executeQueryLocal(schemaName, 
qry, keepBinary, filter, cancel, qryId));
         }
         catch (IgniteCheckedException e) {
-            runningQueryMgr.unregister(qryId, true);
+            runningQryMgr.unregister(qryId, true);
 
             throw new IgniteSQLException("Failed to execute local statement 
[stmt=" + sqlQry +
                 ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
@@ -1923,7 +1741,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     private Long registerRunningQuery(String schemaName, GridQueryCancel 
cancel, String qry, boolean loc,
         boolean registerAsNewQry) {
         if (registerAsNewQry)
-            return runningQueryMgr.register(qry, 
GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
+            return runningQryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, 
schemaName, loc, cancel);
 
         return null;
     }
@@ -1987,7 +1805,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         int paramsCnt = prepared.getParameters().size();
 
         Object[] argsOrig = qry.getArgs();
-
         Object[] args = null;
 
         if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
@@ -2042,6 +1859,12 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         SqlFieldsQuery newQry = 
cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args);
 
+        if (CommandProcessor.isCommand(prepared)) {
+            GridSqlStatement cmdH2 = new 
GridSqlQueryParser(false).parse(prepared);
+
+            return new ParsingResult(newQry, null, cmdH2, remainingSql);
+        }
+
         boolean hasTwoStep = !loc && prepared.isQuery();
 
         // Let's not cache multiple statements and distributed queries as 
whole two step query will be cached later on.
@@ -2090,8 +1913,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             }
         }
 
-        checkQueryType(qry, true);
-
         return new ParsingResult(prepared, newQry, remainingSql, 
cachedQry.query(), cachedQryKey, cachedQry.meta());
     }
 
@@ -2294,7 +2115,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         }
         finally {
             if (!cursorCreated)
-                runningQueryMgr.unregister(qryId, failed);
+                runningQryMgr.unregister(qryId, failed);
         }
     }
 
@@ -2556,7 +2377,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @return Running query manager.
      */
     public RunningQueryManager runningQueryManager() {
-        return runningQueryMgr;
+        return runningQryMgr;
     }
 
     /** {@inheritDoc} */
@@ -2593,10 +2414,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         mapQryExec.start(ctx, this);
         rdcQryExec.start(ctx, this);
 
-        ddlProc = new DdlStatementsProcessor(ctx, schemaMgr);
-
+        runningQryMgr = new RunningQueryManager(ctx);
         partExtractor = new PartitionExtractor(new H2PartitionResolver(this));
-        runningQueryMgr = new RunningQueryManager(ctx);
+
+        cmdProc = new CommandProcessor(ctx, schemaMgr, runningQryMgr);
 
         if (JdbcUtils.serializer != null)
             U.warn(log, "Custom H2 serialization is already configured, will 
override.");
@@ -2733,7 +2554,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         qryCtxRegistry.clearSharedOnLocalNodeStop();
 
-        runningQueryMgr.stop();
+        runningQryMgr.stop();
         schemaMgr.stop();
         connMgr.stop();
 
@@ -2749,7 +2570,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         GridNearTxLocal tx = tx(ctx);
 
         if (tx != null)
-            doRollback(tx);
+            cmdProc.doRollback(tx);
     }
 
     /** {@inheritDoc} */
@@ -2818,8 +2639,9 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     /**
      * Remove all cached queries from cached two-steps queries.
      */
-    private void clearCachedQueries() {
+    private void clearPlanCache() {
         twoStepCache = new 
GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
+        updatePlanCache = new 
GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
     }
 
     /** {@inheritDoc} */
@@ -2873,12 +2695,12 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @return SQL running queries.
      */
     public List<GridRunningQueryInfo> runningSqlQueries() {
-        return runningQueryMgr.runningSqlQueries();
+        return runningQryMgr.runningSqlQueries();
     }
 
     /** {@inheritDoc} */
     @Override public Collection<GridRunningQueryInfo> runningQueries(long 
duration) {
-        return runningQueryMgr.longRunningQueries(duration);
+        return runningQryMgr.longRunningQueries(duration);
     }
 
     /**
@@ -2887,21 +2709,21 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @return Queries history metrics.
      */
     public Map<QueryHistoryMetricsKey, QueryHistoryMetrics> 
queryHistoryMetrics() {
-        return runningQueryMgr.queryHistoryMetrics();
+        return runningQryMgr.queryHistoryMetrics();
     }
 
     /**
      * Reset query history metrics.
      */
     public void resetQueryHistoryMetrics() {
-        runningQueryMgr.resetQueryHistoryMetrics();
+        runningQryMgr.resetQueryHistoryMetrics();
     }
 
     /** {@inheritDoc} */
     @Override public void cancelQueries(Collection<Long> queries) {
         if (!F.isEmpty(queries)) {
             for (Long qryId : queries)
-                runningQueryMgr.cancel(qryId);
+                runningQryMgr.cancel(qryId);
         }
     }
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java
index de2a50e..0358f64 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.h2.command.Prepared;
 
@@ -46,8 +47,11 @@ final class ParsingResult {
     /** Metadata for two-step query, or {@code} null if this result is for 
local query. */
     private final List<GridQueryFieldMetadata> meta;
 
-    /** Parsed native command. */
-    private final SqlCommand nativeCmd;
+    /** Command (native). */
+    private final SqlCommand cmdNative;
+
+    /** Command (H2). */
+    private final GridSqlStatement cmdH2;
 
     /**
      * Simple constructor.
@@ -59,7 +63,8 @@ final class ParsingResult {
         GridCacheTwoStepQuery twoStepQry,
         H2TwoStepCachedQueryKey twoStepQryKey,
         List<GridQueryFieldMetadata> meta,
-        SqlCommand nativeCmd
+        SqlCommand cmdNative,
+        GridSqlStatement cmdH2
     ) {
         this.prepared = prepared;
         this.newQry = newQry;
@@ -67,7 +72,8 @@ final class ParsingResult {
         this.twoStepQry = twoStepQry;
         this.twoStepQryKey = twoStepQryKey;
         this.meta = meta;
-        this.nativeCmd = nativeCmd;
+        this.cmdNative = cmdNative;
+        this.cmdH2 = cmdH2;
     }
 
     /**
@@ -81,18 +87,19 @@ final class ParsingResult {
         H2TwoStepCachedQueryKey twoStepQryKey,
         List<GridQueryFieldMetadata> meta
     ) {
-        this(prepared, newQry, remainingSql, twoStepQry, twoStepQryKey, meta, 
null);
+        this(prepared, newQry, remainingSql, twoStepQry, twoStepQryKey, meta, 
null, null);
     }
 
     /**
      * Construct parsing result in case of native parsing.
      *
      * @param newQry leading sql statement of the original multi-statement 
query.
-     * @param nativeCmd parsed sql command. Represents newQry.
+     * @param cmdNative Command (native).
+     * @param cmdH2 Command (H2).
      * @param remainingSql the rest of the original query.
      */
-    public ParsingResult(SqlFieldsQuery newQry, SqlCommand nativeCmd, String 
remainingSql) {
-        this(null, newQry, remainingSql, null, null, null, nativeCmd);
+    public ParsingResult(SqlFieldsQuery newQry, SqlCommand cmdNative, 
GridSqlStatement cmdH2, String remainingSql) {
+        this(null, newQry, remainingSql, null, null, null, cmdNative, cmdH2);
     }
 
     /**
@@ -103,7 +110,7 @@ final class ParsingResult {
      * @param remainingSql the rest of the original query.
      */
     public ParsingResult(Prepared prepared, SqlFieldsQuery newQry, String 
remainingSql) {
-        this(prepared, newQry, remainingSql, null, null, null, null);
+        this(prepared, newQry, remainingSql, null, null, null, null, null);
     }
 
     /**
@@ -121,10 +128,17 @@ final class ParsingResult {
     }
 
     /**
-     * Sql command produced by native sql parser.
+     * Command (native).
+     */
+    public SqlCommand commandNative() {
+        return cmdNative;
+    }
+
+    /**
+     * @return Command (H2).
      */
-    public SqlCommand nativeCommand() {
-        return nativeCmd;
+    public GridSqlStatement commandH2() {
+        return cmdH2;
     }
 
     /**
@@ -162,4 +176,11 @@ final class ParsingResult {
     public int parametersCount() {
         return prepared != null ? prepared.getParameters().size() : 
twoStepQry.parametersCount();
     }
+
+    /**
+     * @return Check whether this is a command.
+     */
+    public boolean isCommand() {
+        return cmdNative != null || cmdH2 != null;
+    }
 }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
index 2204c596..d97ef08 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
@@ -57,7 +57,7 @@ import 
org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import 
org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.CommandProcessor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.util.GridStringBuilder;
@@ -933,7 +933,7 @@ public class H2DynamicTableSelfTest extends 
AbstractSchemaSelfTest {
     }
 
     /**
-     * Tests table name conflict check in {@link DdlStatementsProcessor}.
+     * Tests table name conflict check in {@link CommandProcessor}.
      * @throws Exception if failed.
      */
     @Test

Reply via email to