This is an automated email from the ASF dual-hosted git repository. vozerov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 88fcc92 IGNITE-12275: SQL: Moved non-SELECT/DML commands processing to separate class. This closes #6071. 88fcc92 is described below commit 88fcc92243df12bff1d18742211062d5e1d7dee4 Author: devozerov <ppoze...@gmail.com> AuthorDate: Sat Feb 9 18:02:37 2019 +0300 IGNITE-12275: SQL: Moved non-SELECT/DML commands processing to separate class. This closes #6071. --- ...tementsProcessor.java => CommandProcessor.java} | 323 ++++++++++++++++---- .../processors/query/h2/CommandResult.java | 58 ++++ .../processors/query/h2/IgniteH2Indexing.java | 324 +++++---------------- .../processors/query/h2/ParsingResult.java | 45 ++- .../cache/index/H2DynamicTableSelfTest.java | 4 +- 5 files changed, 435 insertions(+), 319 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java similarity index 72% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java index 84dfea2..8c52296 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.ddl; +package org.apache.ignite.internal.processors.query.h2; import java.util.ArrayList; import java.util.Collections; @@ -28,28 +28,41 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCluster; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.QueryIndexType; +import org.apache.ignite.cache.query.BulkLoadContextCursor; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; +import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter; +import org.apache.ignite.internal.processors.bulkload.BulkLoadParser; +import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; +import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.processors.query.QueryEntityEx; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.processors.query.h2.H2Utils; -import org.apache.ignite.internal.processors.query.h2.SchemaManager; +import org.apache.ignite.internal.processors.query.RunningQueryManager; +import org.apache.ignite.internal.processors.query.SqlClientContext; +import org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableAddColumn; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableDropColumn; @@ -58,20 +71,26 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateTable; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropTable; -import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.sql.command.SqlAlterTableCommand; import org.apache.ignite.internal.sql.command.SqlAlterUserCommand; +import org.apache.ignite.internal.sql.command.SqlBeginTransactionCommand; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.sql.command.SqlCommand; +import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand; import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand; import org.apache.ignite.internal.sql.command.SqlCreateUserCommand; import org.apache.ignite.internal.sql.command.SqlDropIndexCommand; import org.apache.ignite.internal.sql.command.SqlDropUserCommand; import org.apache.ignite.internal.sql.command.SqlIndexColumn; +import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand; +import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.plugin.security.SecurityPermission; import org.h2.command.Prepared; import org.h2.command.ddl.AlterTableAlterColumn; @@ -82,20 +101,29 @@ import org.h2.command.ddl.DropTable; import org.h2.table.Column; import org.h2.value.DataType; import org.h2.value.Value; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.PARAM_WRAP_VALUE; /** - * DDL statements processor.<p> - * Contains higher level logic to handle operations as a whole and communicate with the client. + * Processor responsible for execution of all non-SELECT and non-DML commands. */ -public class DdlStatementsProcessor { +public class CommandProcessor { /** Kernal context. */ - private GridKernalContext ctx; + private final GridKernalContext ctx; - /** Indexing. */ - private SchemaManager schemaMgr; + /** Schema manager. */ + private final SchemaManager schemaMgr; + + /** Running query manager. */ + private final RunningQueryManager runningQryMgr; + + /** Logger. */ + private final IgniteLogger log; /** Is backward compatible handling of UUID through DDL enabled. */ private static final boolean handleUuidAsByte = @@ -107,27 +135,19 @@ public class DdlStatementsProcessor { * @param ctx Kernal context. * @param schemaMgr Schema manager. */ - public DdlStatementsProcessor(GridKernalContext ctx, SchemaManager schemaMgr) { + public CommandProcessor(GridKernalContext ctx, SchemaManager schemaMgr, RunningQueryManager runningQryMgr) { this.ctx = ctx; this.schemaMgr = schemaMgr; - } + this.runningQryMgr = runningQryMgr; - /** - * Initialize message handlers and this' fields needed for further operation. - * - * @param ctx Kernal context. - * @param schemaMgr Schema manager. - */ - public void start(final GridKernalContext ctx, SchemaManager schemaMgr) { - this.ctx = ctx; - this.schemaMgr = schemaMgr; + log = ctx.log(CommandProcessor.class); } /** * @param cmd Command. * @return {@code True} if this is supported DDL command. */ - public static boolean isDdlCommand(SqlCommand cmd) { + private static boolean isDdl(SqlCommand cmd) { return cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand || cmd instanceof SqlAlterTableCommand @@ -137,13 +157,54 @@ public class DdlStatementsProcessor { } /** + * Execute command. + * + * @param qry Query. + * @param cmdNative Native command (if any). + * @param cmdH2 H2 command (if any). + * @param cliCtx Client context. + * @param qryId Running query ID. + * @return Result. + */ + public CommandResult runCommand(SqlFieldsQuery qry, SqlCommand cmdNative, GridSqlStatement cmdH2, + @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException { + assert cmdNative != null || cmdH2 != null; + + // Do execute. + FieldsQueryCursor<List<?>> res = H2Utils.zeroCursor(); + boolean unregister = true; + + if (cmdNative != null) { + assert cmdH2 == null; + + if (isDdl(cmdNative)) + runCommandNativeDdl(qry.getSql(), cmdNative); + else if (cmdNative instanceof SqlBulkLoadCommand) { + res = processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, qryId); + + unregister = false; + } + else if (cmdNative instanceof SqlSetStreamingCommand) + processSetStreamingCommand((SqlSetStreamingCommand)cmdNative, cliCtx); + else + processTxCommand(cmdNative, qry); + } + else { + assert cmdH2 != null; + + runCommandH2(qry.getSql(), cmdH2); + } + + return new CommandResult(res, unregister); + } + + /** * Run DDL statement. * * @param sql Original SQL. * @param cmd Command. - * @return Result. */ - public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) { + private void runCommandNativeDdl(String sql, SqlCommand cmd) { IgniteInternalFuture fut = null; try { @@ -260,8 +321,6 @@ public class DdlStatementsProcessor { if (fut != null) fut.get(); - - return H2Utils.zeroCursor(); } catch (SchemaOperationException e) { throw convert(e); @@ -278,20 +337,17 @@ public class DdlStatementsProcessor { * Execute DDL statement. * * @param sql SQL. - * @param prepared Prepared. - * @return Cursor on query results. + * @param cmdH2 Command. */ @SuppressWarnings({"unchecked"}) - public FieldsQueryCursor<List<?>> runDdlStatement(String sql, Prepared prepared) { + private void runCommandH2(String sql, GridSqlStatement cmdH2) { IgniteInternalFuture fut = null; try { finishActiveTxIfNecessary(); - GridSqlStatement stmt0 = new GridSqlQueryParser(false).parse(prepared); - - if (stmt0 instanceof GridSqlCreateIndex) { - GridSqlCreateIndex cmd = (GridSqlCreateIndex)stmt0; + if (cmdH2 instanceof GridSqlCreateIndex) { + GridSqlCreateIndex cmd = (GridSqlCreateIndex)cmdH2; isDdlOnSchemaSupported(cmd.schemaName()); @@ -329,8 +385,8 @@ public class DdlStatementsProcessor { fut = ctx.query().dynamicIndexCreate(tbl.cacheName(), cmd.schemaName(), typeDesc.tableName(), newIdx, cmd.ifNotExists(), 0); } - else if (stmt0 instanceof GridSqlDropIndex) { - GridSqlDropIndex cmd = (GridSqlDropIndex) stmt0; + else if (cmdH2 instanceof GridSqlDropIndex) { + GridSqlDropIndex cmd = (GridSqlDropIndex) cmdH2; isDdlOnSchemaSupported(cmd.schemaName()); @@ -350,10 +406,10 @@ public class DdlStatementsProcessor { cmd.indexName()); } } - else if (stmt0 instanceof GridSqlCreateTable) { + else if (cmdH2 instanceof GridSqlCreateTable) { ctx.security().authorize(null, SecurityPermission.CACHE_CREATE, null); - GridSqlCreateTable cmd = (GridSqlCreateTable)stmt0; + GridSqlCreateTable cmd = (GridSqlCreateTable)cmdH2; isDdlOnSchemaSupported(cmd.schemaName()); @@ -383,10 +439,10 @@ public class DdlStatementsProcessor { cmd.writeSynchronizationMode(), cmd.backups(), cmd.ifNotExists(), cmd.encrypted()); } } - else if (stmt0 instanceof GridSqlDropTable) { + else if (cmdH2 instanceof GridSqlDropTable) { ctx.security().authorize(null, SecurityPermission.CACHE_DESTROY, null); - GridSqlDropTable cmd = (GridSqlDropTable)stmt0; + GridSqlDropTable cmd = (GridSqlDropTable)cmdH2; isDdlOnSchemaSupported(cmd.schemaName()); @@ -400,8 +456,8 @@ public class DdlStatementsProcessor { else ctx.query().dynamicTableDrop(tbl.cacheName(), cmd.tableName(), cmd.ifExists()); } - else if (stmt0 instanceof GridSqlAlterTableAddColumn) { - GridSqlAlterTableAddColumn cmd = (GridSqlAlterTableAddColumn)stmt0; + else if (cmdH2 instanceof GridSqlAlterTableAddColumn) { + GridSqlAlterTableAddColumn cmd = (GridSqlAlterTableAddColumn)cmdH2; isDdlOnSchemaSupported(cmd.schemaName()); @@ -455,8 +511,8 @@ public class DdlStatementsProcessor { } } } - else if (stmt0 instanceof GridSqlAlterTableDropColumn) { - GridSqlAlterTableDropColumn cmd = (GridSqlAlterTableDropColumn)stmt0; + else if (cmdH2 instanceof GridSqlAlterTableDropColumn) { + GridSqlAlterTableDropColumn cmd = (GridSqlAlterTableDropColumn)cmdH2; isDdlOnSchemaSupported(cmd.schemaName()); @@ -519,13 +575,6 @@ public class DdlStatementsProcessor { if (fut != null) fut.get(); - - QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(0L)), null, false); - - resCur.fieldsMeta(UPDATE_RESULT_META); - - return resCur; } catch (SchemaOperationException e) { U.error(null, "DDL operation failure", e); @@ -742,7 +791,7 @@ public class DdlStatementsProcessor { * @param cmd Statement. * @return Whether {@code cmd} is a DDL statement we're able to handle. */ - public static boolean isDdlStatement(Prepared cmd) { + public static boolean isCommand(Prepared cmd) { return cmd instanceof CreateIndex || cmd instanceof DropIndex || cmd instanceof CreateTable || cmd instanceof DropTable || cmd instanceof AlterTableAlterColumn; } @@ -765,4 +814,170 @@ public class DdlStatementsProcessor { return DataType.getTypeClassName(type); } } -} + + /** + * Process transactional command. + * @param cmd Command. + * @param qry Query. + * @throws IgniteCheckedException if failed. + */ + private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry) + throws IgniteCheckedException { + NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() : + NestedTxMode.DEFAULT; + + GridNearTxLocal tx = tx(ctx); + + if (cmd instanceof SqlBeginTransactionCommand) { + if (!mvccEnabled(ctx)) + throw new IgniteSQLException("MVCC must be enabled in order to start transaction.", + IgniteQueryErrorCode.MVCC_DISABLED); + + if (tx != null) { + if (nestedTxMode == null) + nestedTxMode = NestedTxMode.DEFAULT; + + switch (nestedTxMode) { + case COMMIT: + doCommit(tx); + + txStart(ctx, qry.getTimeout()); + + break; + + case IGNORE: + log.warning("Transaction has already been started, ignoring BEGIN command."); + + break; + + case ERROR: + throw new IgniteSQLException("Transaction has already been started.", + IgniteQueryErrorCode.TRANSACTION_EXISTS); + + default: + throw new IgniteSQLException("Unexpected nested transaction handling mode: " + + nestedTxMode.name()); + } + } + else + txStart(ctx, qry.getTimeout()); + } + else if (cmd instanceof SqlCommitTransactionCommand) { + // Do nothing if there's no transaction. + if (tx != null) + doCommit(tx); + } + else { + assert cmd instanceof SqlRollbackTransactionCommand; + + // Do nothing if there's no transaction. + if (tx != null) + doRollback(tx); + } + } + + /** + * Commit and properly close transaction. + * @param tx Transaction. + * @throws IgniteCheckedException if failed. + */ + private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { + try { + tx.commit(); + } + finally { + closeTx(tx); + } + } + + /** + * Rollback and properly close transaction. + * @param tx Transaction. + * @throws IgniteCheckedException if failed. + */ + public void doRollback(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { + try { + tx.rollback(); + } + finally { + closeTx(tx); + } + } + + /** + * Properly close transaction. + * @param tx Transaction. + * @throws IgniteCheckedException if failed. + */ + private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { + try { + tx.close(); + } + finally { + ctx.cache().context().tm().resetContext(); + } + } + + /** + * Process SET STREAMING command. + * + * @param cmd Command. + * @param cliCtx Client context. + */ + private void processSetStreamingCommand(SqlSetStreamingCommand cmd, + @Nullable SqlClientContext cliCtx) { + if (cliCtx == null) + throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver."); + + if (cmd.isTurnOn()) + cliCtx.enableStreaming( + cmd.allowOverwrite(), + cmd.flushFrequency(), + cmd.perNodeBufferSize(), + cmd.perNodeParallelOperations(), + cmd.isOrdered() + ); + else + cliCtx.disableStreaming(); + } + + /** + * Process bulk load COPY command. + * + * @param cmd The command. + * @param qryId Query id. + * @return The context (which is the result of the first request/response). + * @throws IgniteCheckedException If something failed. + */ + private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId) + throws IgniteCheckedException { + if (cmd.packetSize() == null) + cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE); + + GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); + + if (tbl == null) { + throw new IgniteSQLException("Table does not exist: " + cmd.tableName(), + IgniteQueryErrorCode.TABLE_NOT_FOUND); + } + + H2Utils.checkAndStartNotStartedCache(ctx, tbl); + + UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl); + + IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new DmlBulkLoadDataConverter(plan); + + IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName()); + + BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer); + + BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat()); + + BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter, + runningQryMgr, qryId); + + BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize()); + + return new BulkLoadContextCursor(processor, params); + } +} \ No newline at end of file diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java new file mode 100644 index 0000000..a025bb1 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import org.apache.ignite.cache.query.FieldsQueryCursor; + +import java.util.List; + +/** + * Command execution result. + */ +public class CommandResult { + /** Cursor. */ + private final FieldsQueryCursor<List<?>> cur; + + /** Whether running query should be unregistered. */ + private final boolean unregisterRunningQry; + + /** + * Constructor. + * + * @param cur Cursor. + * @param unregisterRunningQry Whether running query should be unregistered. + */ + public CommandResult(FieldsQueryCursor<List<?>> cur, boolean unregisterRunningQry) { + this.cur = cur; + this.unregisterRunningQry = unregisterRunningQry; + } + + /** + * @return Cursor. + */ + public FieldsQueryCursor<List<?>> cursor() { + return cur; + } + + /** + * @return Whether running query should be unregistered. + */ + public boolean unregisterRunningQuery() { + return unregisterRunningQry; + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index afdd9e5..f6867d8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -42,7 +42,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheServerNotFoundException; -import org.apache.ignite.cache.query.BulkLoadContextCursor; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -54,11 +53,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; -import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter; -import org.apache.ignite.internal.processors.bulkload.BulkLoadParser; -import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; -import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; @@ -96,7 +90,6 @@ import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryHistoryMetrics; import org.apache.ignite.internal.processors.query.QueryHistoryMetricsKey; @@ -106,7 +99,6 @@ import org.apache.ignite.internal.processors.query.RunningQueryManager; import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver; -import org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter; import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateResultsIterator; import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIterator; @@ -125,7 +117,6 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccInnerIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO; -import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor; import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; @@ -165,7 +156,6 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridPlainRunnable; -import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; import org.apache.ignite.internal.util.typedef.F; @@ -194,7 +184,6 @@ import org.h2.engine.Session; import org.h2.engine.SysProperties; import org.h2.table.IndexColumn; import org.h2.util.JdbcUtils; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static java.lang.Boolean.FALSE; @@ -251,7 +240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION); /** Update plans cache. */ - private final ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache = + private volatile ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache = new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE); /** Logger. */ @@ -285,8 +274,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** Query context registry. */ private final QueryContextRegistry qryCtxRegistry = new QueryContextRegistry(); - /** */ - private DdlStatementsProcessor ddlProc; + /** Processor to execute commands which are neither SELECT, nor DML. */ + private CommandProcessor cmdProc; /** Partition reservation manager. */ private PartitionReservationManager partReservationMgr; @@ -295,7 +284,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { private PartitionExtractor partExtractor; /** Running query manager. */ - private RunningQueryManager runningQueryMgr; + private RunningQueryManager runningQryMgr; /** */ private volatile GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery> twoStepCache = @@ -414,7 +403,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { boolean ifTblExists, boolean ifColNotExists) throws IgniteCheckedException { schemaMgr.addColumn(schemaName, tblName, cols, ifTblExists, ifColNotExists); - clearCachedQueries(); + clearPlanCache(); } /** {@inheritDoc} */ @@ -422,7 +411,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { boolean ifColExists) throws IgniteCheckedException { schemaMgr.dropColumn(schemaName, tblName, cols, ifTblExists, ifColExists); - clearCachedQueries(); + clearPlanCache(); } /** @@ -554,7 +543,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator<>(updResRow)); } - else if (DdlStatementsProcessor.isDdlStatement(p)) { + else if (CommandProcessor.isCommand(p)) { throw new IgniteSQLException("DDL statements are supported for the whole cluster only.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } @@ -823,7 +812,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { @SuppressWarnings({"unchecked", "Anonymous2MethodRef"}) private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) throws IgniteCheckedException { - Long qryId = runningQueryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null); + Long qryId = runningQryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null); boolean fail = false; @@ -912,7 +901,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw e; } finally { - runningQueryMgr.unregister(qryId, fail); + runningQryMgr.unregister(qryId, fail); } } @@ -1439,7 +1428,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(parser.lastCommandSql()); - return new ParsingResult(newQry, leadingCmd, parser.remainingSql()); + return new ParsingResult(newQry, leadingCmd, null, parser.remainingSql()); } catch (SqlStrictParseException e) { throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.PARSING, e); @@ -1463,38 +1452,37 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Executes a query natively. + * Execute command. * * @param schemaName Schema name. * @param qry Query. - * @param cmd Parsed command corresponding to query. - * @param cliCtx Client context, or {@code null} if not applicable. - * @return Result cursors. + * @param cliCtx CLient context. + * @param cmdNative Command (native). + * @param cmdH2 Command (H2). + * @return Result. */ - private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry, - SqlCommand cmd, @Nullable SqlClientContext cliCtx) { - boolean fail = false; - boolean unregister = true; + public FieldsQueryCursor<List<?>> executeCommand( + String schemaName, + SqlFieldsQuery qry, + @Nullable SqlClientContext cliCtx, + SqlCommand cmdNative, + GridSqlStatement cmdH2 + ) { + if (qry.isLocal()) { + throw new IgniteSQLException("DDL statements are not supported for LOCAL caches", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + } Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true); - try { - FieldsQueryCursor<List<?>> cur; + boolean fail = false; - if (DdlStatementsProcessor.isDdlCommand(cmd)) - cur = ddlProc.runDdlStatement(qry.getSql(), cmd); - else if (cmd instanceof SqlBulkLoadCommand) { - // Query will be unregistered when cursor is closed. - unregister = false; + CommandResult res = null; - cur = processBulkLoadCommand((SqlBulkLoadCommand) cmd, qryId); - } - else if (cmd instanceof SqlSetStreamingCommand) - cur = processSetStreamingCommand((SqlSetStreamingCommand)cmd, cliCtx); - else - cur = processTxCommand(cmd, qry); + try { + res = cmdProc.runCommand(qry, cmdNative, cmdH2, cliCtx, qryId); - return Collections.singletonList(cur); + return res.cursor(); } catch (IgniteCheckedException e) { fail = true; @@ -1503,38 +1491,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { ", err=" + e.getMessage() + ']', e); } finally { - if (unregister || fail) - runningQueryMgr.unregister(qryId, fail); + if (fail || (res != null && res.unregisterRunningQuery())) + runningQryMgr.unregister(qryId, fail); } } /** - * Process SET STREAMING command. - * - * @param cmd Command. - * @param cliCtx Client context. - * @return Cursor. - */ - private FieldsQueryCursor<List<?>> processSetStreamingCommand(SqlSetStreamingCommand cmd, - @Nullable SqlClientContext cliCtx) { - if (cliCtx == null) - throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver."); - - if (cmd.isTurnOn()) - cliCtx.enableStreaming( - cmd.allowOverwrite(), - cmd.flushFrequency(), - cmd.perNodeBufferSize(), - cmd.perNodeParallelOperations(), - cmd.isOrdered() - ); - else - cliCtx.disableStreaming(); - - return H2Utils.zeroCursor(); - } - - /** * Check expected statement type (when it is set by JDBC) and given statement type. * * @param qry Query. @@ -1548,151 +1510,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { IgniteQueryErrorCode.STMT_TYPE_MISMATCH); } - /** - * Process transactional command. - * @param cmd Command. - * @param qry Query. - * @throws IgniteCheckedException if failed. - */ - private FieldsQueryCursor<List<?>> processTxCommand(SqlCommand cmd, SqlFieldsQuery qry) - throws IgniteCheckedException { - NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() : - NestedTxMode.DEFAULT; - - GridNearTxLocal tx = tx(ctx); - - if (cmd instanceof SqlBeginTransactionCommand) { - if (!mvccEnabled(ctx)) - throw new IgniteSQLException("MVCC must be enabled in order to start transaction.", - IgniteQueryErrorCode.MVCC_DISABLED); - - if (tx != null) { - if (nestedTxMode == null) - nestedTxMode = NestedTxMode.DEFAULT; - - switch (nestedTxMode) { - case COMMIT: - doCommit(tx); - - txStart(ctx, qry.getTimeout()); - - break; - - case IGNORE: - log.warning("Transaction has already been started, ignoring BEGIN command."); - - break; - - case ERROR: - throw new IgniteSQLException("Transaction has already been started.", - IgniteQueryErrorCode.TRANSACTION_EXISTS); - - default: - throw new IgniteSQLException("Unexpected nested transaction handling mode: " + - nestedTxMode.name()); - } - } - else - txStart(ctx, qry.getTimeout()); - } - else if (cmd instanceof SqlCommitTransactionCommand) { - // Do nothing if there's no transaction. - if (tx != null) - doCommit(tx); - } - else { - assert cmd instanceof SqlRollbackTransactionCommand; - - // Do nothing if there's no transaction. - if (tx != null) - doRollback(tx); - } - - return H2Utils.zeroCursor(); - } - - /** - * Commit and properly close transaction. - * @param tx Transaction. - * @throws IgniteCheckedException if failed. - */ - private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { - try { - tx.commit(); - } - finally { - closeTx(tx); - } - } - - /** - * Rollback and properly close transaction. - * @param tx Transaction. - * @throws IgniteCheckedException if failed. - */ - private void doRollback(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { - try { - tx.rollback(); - } - finally { - closeTx(tx); - } - } - - /** - * Properly close transaction. - * @param tx Transaction. - * @throws IgniteCheckedException if failed. - */ - private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { - try { - tx.close(); - } - finally { - ctx.cache().context().tm().resetContext(); - } - } - - /** - * Process bulk load COPY command. - * - * @param cmd The command. - * @param qryId Query id. - * @return The context (which is the result of the first request/response). - * @throws IgniteCheckedException If something failed. - */ - private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId) - throws IgniteCheckedException { - if (cmd.packetSize() == null) - cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE); - - GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); - - if (tbl == null) { - throw new IgniteSQLException("Table does not exist: " + cmd.tableName(), - IgniteQueryErrorCode.TABLE_NOT_FOUND); - } - - H2Utils.checkAndStartNotStartedCache(ctx, tbl); - - UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl); - - IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new DmlBulkLoadDataConverter(plan); - - IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName()); - - BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer); - - BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat()); - - BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter, - runningQueryMgr, qryId); - - BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize()); - - return new BulkLoadContextCursor(processor, params); - } - /** {@inheritDoc} */ @SuppressWarnings({"StringEquality", "unchecked"}) @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, @@ -1712,8 +1529,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { ParsingResult parseRes = parse(schemaName, remainingQry, firstArg); - SqlCommand nativeCmd = parseRes.nativeCommand(); - remainingSql = parseRes.remainingSql(); if (remainingSql != null && failOnMultipleStmts) @@ -1723,6 +1538,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { assert newQry.getSql() != null; + // Check if operation is performed on an active cluster. + SqlCommand nativeCmd = parseRes.commandNative(); + if (!(nativeCmd instanceof SqlCommitTransactionCommand || nativeCmd instanceof SqlRollbackTransactionCommand) && !ctx.state().publicApiActiveState(true)) { throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, " + @@ -1730,9 +1548,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { "let all the nodes join the cluster. To activate the cluster call Ignite.active(true)."); } - if (nativeCmd != null) - res.addAll(queryDistributedSqlFieldsNative(schemaName, newQry, nativeCmd, cliCtx)); - else { + if (parseRes.isCommand()) { + // Execute command. + FieldsQueryCursor<List<?>> cmdRes = executeCommand( + schemaName, + newQry, + cliCtx, + parseRes.commandNative(), + parseRes.commandH2() + ); + + res.add(cmdRes); + } + else { + // Execute query or DML. List<GridQueryFieldMetadata> meta = parseRes.meta(); GridCacheTwoStepQuery twoStepQry = parseRes.twoStepQuery(); @@ -1861,17 +1690,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - if (DdlStatementsProcessor.isDdlStatement(prepared)) { - if (loc) { - fail = true; - - throw new IgniteSQLException("DDL statements are not supported for LOCAL caches", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - } - - return Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared)); - } - if (prepared instanceof NoOperation) return Collections.singletonList(H2Utils.zeroCursor()); @@ -1881,7 +1699,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } finally { - runningQueryMgr.unregister(qryId, fail); + runningQryMgr.unregister(qryId, fail); } } @@ -1905,7 +1723,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { return Collections.singletonList(executeQueryLocal(schemaName, qry, keepBinary, filter, cancel, qryId)); } catch (IgniteCheckedException e) { - runningQueryMgr.unregister(qryId, true); + runningQryMgr.unregister(qryId, true); throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry + ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); @@ -1923,7 +1741,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { private Long registerRunningQuery(String schemaName, GridQueryCancel cancel, String qry, boolean loc, boolean registerAsNewQry) { if (registerAsNewQry) - return runningQueryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel); + return runningQryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel); return null; } @@ -1987,7 +1805,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { int paramsCnt = prepared.getParameters().size(); Object[] argsOrig = qry.getArgs(); - Object[] args = null; if (!DmlUtils.isBatched(qry) && paramsCnt > 0) { @@ -2042,6 +1859,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args); + if (CommandProcessor.isCommand(prepared)) { + GridSqlStatement cmdH2 = new GridSqlQueryParser(false).parse(prepared); + + return new ParsingResult(newQry, null, cmdH2, remainingSql); + } + boolean hasTwoStep = !loc && prepared.isQuery(); // Let's not cache multiple statements and distributed queries as whole two step query will be cached later on. @@ -2090,8 +1913,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - checkQueryType(qry, true); - return new ParsingResult(prepared, newQry, remainingSql, cachedQry.query(), cachedQryKey, cachedQry.meta()); } @@ -2294,7 +2115,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } finally { if (!cursorCreated) - runningQueryMgr.unregister(qryId, failed); + runningQryMgr.unregister(qryId, failed); } } @@ -2556,7 +2377,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @return Running query manager. */ public RunningQueryManager runningQueryManager() { - return runningQueryMgr; + return runningQryMgr; } /** {@inheritDoc} */ @@ -2593,10 +2414,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { mapQryExec.start(ctx, this); rdcQryExec.start(ctx, this); - ddlProc = new DdlStatementsProcessor(ctx, schemaMgr); - + runningQryMgr = new RunningQueryManager(ctx); partExtractor = new PartitionExtractor(new H2PartitionResolver(this)); - runningQueryMgr = new RunningQueryManager(ctx); + + cmdProc = new CommandProcessor(ctx, schemaMgr, runningQryMgr); if (JdbcUtils.serializer != null) U.warn(log, "Custom H2 serialization is already configured, will override."); @@ -2733,7 +2554,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { qryCtxRegistry.clearSharedOnLocalNodeStop(); - runningQueryMgr.stop(); + runningQryMgr.stop(); schemaMgr.stop(); connMgr.stop(); @@ -2749,7 +2570,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridNearTxLocal tx = tx(ctx); if (tx != null) - doRollback(tx); + cmdProc.doRollback(tx); } /** {@inheritDoc} */ @@ -2818,8 +2639,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** * Remove all cached queries from cached two-steps queries. */ - private void clearCachedQueries() { + private void clearPlanCache() { twoStepCache = new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE); + updatePlanCache = new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE); } /** {@inheritDoc} */ @@ -2873,12 +2695,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @return SQL running queries. */ public List<GridRunningQueryInfo> runningSqlQueries() { - return runningQueryMgr.runningSqlQueries(); + return runningQryMgr.runningSqlQueries(); } /** {@inheritDoc} */ @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) { - return runningQueryMgr.longRunningQueries(duration); + return runningQryMgr.longRunningQueries(duration); } /** @@ -2887,21 +2709,21 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @return Queries history metrics. */ public Map<QueryHistoryMetricsKey, QueryHistoryMetrics> queryHistoryMetrics() { - return runningQueryMgr.queryHistoryMetrics(); + return runningQryMgr.queryHistoryMetrics(); } /** * Reset query history metrics. */ public void resetQueryHistoryMetrics() { - runningQueryMgr.resetQueryHistoryMetrics(); + runningQryMgr.resetQueryHistoryMetrics(); } /** {@inheritDoc} */ @Override public void cancelQueries(Collection<Long> queries) { if (!F.isEmpty(queries)) { for (Long qryId : queries) - runningQueryMgr.cancel(qryId); + runningQryMgr.cancel(qryId); } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java index de2a50e..0358f64 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.apache.ignite.internal.sql.command.SqlCommand; import org.h2.command.Prepared; @@ -46,8 +47,11 @@ final class ParsingResult { /** Metadata for two-step query, or {@code} null if this result is for local query. */ private final List<GridQueryFieldMetadata> meta; - /** Parsed native command. */ - private final SqlCommand nativeCmd; + /** Command (native). */ + private final SqlCommand cmdNative; + + /** Command (H2). */ + private final GridSqlStatement cmdH2; /** * Simple constructor. @@ -59,7 +63,8 @@ final class ParsingResult { GridCacheTwoStepQuery twoStepQry, H2TwoStepCachedQueryKey twoStepQryKey, List<GridQueryFieldMetadata> meta, - SqlCommand nativeCmd + SqlCommand cmdNative, + GridSqlStatement cmdH2 ) { this.prepared = prepared; this.newQry = newQry; @@ -67,7 +72,8 @@ final class ParsingResult { this.twoStepQry = twoStepQry; this.twoStepQryKey = twoStepQryKey; this.meta = meta; - this.nativeCmd = nativeCmd; + this.cmdNative = cmdNative; + this.cmdH2 = cmdH2; } /** @@ -81,18 +87,19 @@ final class ParsingResult { H2TwoStepCachedQueryKey twoStepQryKey, List<GridQueryFieldMetadata> meta ) { - this(prepared, newQry, remainingSql, twoStepQry, twoStepQryKey, meta, null); + this(prepared, newQry, remainingSql, twoStepQry, twoStepQryKey, meta, null, null); } /** * Construct parsing result in case of native parsing. * * @param newQry leading sql statement of the original multi-statement query. - * @param nativeCmd parsed sql command. Represents newQry. + * @param cmdNative Command (native). + * @param cmdH2 Command (H2). * @param remainingSql the rest of the original query. */ - public ParsingResult(SqlFieldsQuery newQry, SqlCommand nativeCmd, String remainingSql) { - this(null, newQry, remainingSql, null, null, null, nativeCmd); + public ParsingResult(SqlFieldsQuery newQry, SqlCommand cmdNative, GridSqlStatement cmdH2, String remainingSql) { + this(null, newQry, remainingSql, null, null, null, cmdNative, cmdH2); } /** @@ -103,7 +110,7 @@ final class ParsingResult { * @param remainingSql the rest of the original query. */ public ParsingResult(Prepared prepared, SqlFieldsQuery newQry, String remainingSql) { - this(prepared, newQry, remainingSql, null, null, null, null); + this(prepared, newQry, remainingSql, null, null, null, null, null); } /** @@ -121,10 +128,17 @@ final class ParsingResult { } /** - * Sql command produced by native sql parser. + * Command (native). + */ + public SqlCommand commandNative() { + return cmdNative; + } + + /** + * @return Command (H2). */ - public SqlCommand nativeCommand() { - return nativeCmd; + public GridSqlStatement commandH2() { + return cmdH2; } /** @@ -162,4 +176,11 @@ final class ParsingResult { public int parametersCount() { return prepared != null ? prepared.getParameters().size() : twoStepQry.parametersCount(); } + + /** + * @return Check whether this is a command. + */ + public boolean isCommand() { + return cmdNative != null || cmdH2 != null; + } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java index 2204c596..d97ef08 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java @@ -57,7 +57,7 @@ import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor; +import org.apache.ignite.internal.processors.query.h2.CommandProcessor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.util.GridStringBuilder; @@ -933,7 +933,7 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest { } /** - * Tests table name conflict check in {@link DdlStatementsProcessor}. + * Tests table name conflict check in {@link CommandProcessor}. * @throws Exception if failed. */ @Test