korlov42 commented on code in PR #2192:
URL: https://github.com/apache/ignite-3/pull/2192#discussion_r1229460128
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -383,121 +398,187 @@ private void registerIndexListener(IndexEvent evt,
AbstractIndexEventListener ls
private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
SessionId sessionId,
- QueryContext context,
+ QueryContext queryContext,
String sql,
Object... params
) {
Session session = sessionManager.session(sessionId);
if (session == null) {
- return CompletableFuture.failedFuture(
+ return failedFuture(
new SqlException(SESSION_NOT_FOUND_ERR, format("Session
not found [{}]", sessionId)));
}
+ QueryCancel queryCancel = new QueryCancel();
+
+ try {
+ registerQueryCancel(session, new QueryCancel());
+ } catch (IllegalStateException ex) {
+ return failedFuture(new
IgniteInternalException(SESSION_EXPIRED_ERR,
+ format("Session has been expired [{}]",
session.sessionId()), ex));
+ }
+
String schemaName =
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
- InternalTransaction outerTx =
context.unwrap(InternalTransaction.class);
+ InternalTransaction outerTx =
queryContext.unwrap(InternalTransaction.class);
+ boolean implicitTx = outerTx == null;
- QueryCancel queryCancel = new QueryCancel();
+ // TODO: IGNITE-19497 Switch to Catalog manager and uncomment next
lines.
+ long timestamp = outerTx == null ? clock.nowLong() :
outerTx.startTimestamp().longValue();
+ // int plannerCatalogVersion =
catalogManager.activeCatalogVersion(timestamp);
+ // int schemaId = catalogManager.schema(plannerCatalogVersion).id();
+ // SchemaPlus schema = sqlSchemaManager.schema(schemaName,
plannerCatalogVersion);
- AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
- queryCancel::cancel,
- taskExecutor
- );
+ int plannerCatalogVersion = 0;
+ SchemaPlus schema = sqlSchemaManager.schema(schemaName);
- queryCancel.add(() -> session.unregisterResource(closeableResource));
+ if (schema == null) {
+ return failedFuture(new SchemaNotFoundException(schemaName));
+ }
- try {
- session.registerResource(closeableResource);
- } catch (IllegalStateException ex) {
- return CompletableFuture.failedFuture(new
IgniteInternalException(SESSION_EXPIRED_ERR,
- format("Session has been expired [{}]",
session.sessionId()), ex));
+ CacheKey cacheKey = new CacheKey(plannerCatalogVersion, schemaName,
sql, params);
+
+ BaseQueryContext plannerContext = BaseQueryContext.builder()
+
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+ .logger(LOG)
+ .cancel(queryCancel)
+ .parameters(params)
+ .plannerTimeout(PLANNER_TIMEOUT)
+ .build();
+
+ CompletableFuture<QueryPlan> planFuture = queryCache.get(cacheKey);
+
+ if (planFuture == null) {
+ planFuture = CompletableFuture.supplyAsync(() -> {
+ // Parse query
+ StatementParseResult parseResult = IgniteSqlParser.parse(sql,
StatementParseResult.MODE);
+ SqlNode sqlNode = parseResult.statement();
+
+ // Validate statement
+ validateParsedStatement(queryContext, parseResult, sqlNode,
params);
+
+ return sqlNode;
+ }, taskExecutor).thenCompose(sqlNode -> {
+ if (skipCache(Commons.getQueryType(sqlNode))) {
+ // Prepare query plan without caching.
+ //TODO: IGNITE-17765 Can we make a sync callhere ? drop
planning pool? or move parsing to planing pool?
+ return prepareSvc.prepareAsync(sqlNode, plannerContext);
+ }
+
+ // Try query plan for normalized query, or create a new one
asynchronously.
+ CacheKey normalizedQueryCacheKey = new
CacheKey(plannerCatalogVersion, schemaName, sqlNode.toString(), params);
+
+ CompletableFuture<QueryPlan> planFuture0 =
queryCache.computeIfAbsent(
+ normalizedQueryCacheKey,
+ k -> prepareSvc.prepareAsync(sqlNode, plannerContext)
+ );
+
+ queryCache.putIfAbsent(cacheKey, planFuture0);
+
+ // Copy shared plan.
+ return planFuture0.thenApply(QueryPlan::copy);
+ });
+ } else {
+ // Copy shared plan.
+ planFuture = planFuture.thenApply(QueryPlan::copy);
}
- CompletableFuture<Void> start = new CompletableFuture<>();
+ return planFuture
+ .thenComposeAsync(plan -> {
+ // Validate plan
+ if (SqlQueryType.DDL == plan.type() && outerTx != null) {
+ return failedFuture(new
SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support
transactions."));
+ }
+
+ InternalTransaction tx = implicitTx ?
txManager.begin(plan.type() != DML) : outerTx;
+
+ try {
+ /* TODO IGNITE-19497 Restart planning if plan cannot
be used in transaction.
+ int txCatalogVersion =
catalogManager.activeCatalogVersion(tx.startTimestamp().longValue());
+
+ if (implicitTx && plannerCatalogVersion !=
txCatalogVersion) {
+ LOG.info("Retry query planning:
plannerCatalogVersion={}, txCatalogVersion={}",
+ plannerCatalogVersion, txCatalogVersion);
+
+ // TODO IGNITE-19497 Prevent infinite planning, e.g.
by setting max number of tries.
+ return tx.rollbackAsync()
+ .thenComposeAsync(ignore ->
querySingle0(sessionId, queryContext, sql, params), taskExecutor);
+ } */
Review Comment:
the same, let's remove all unrelated code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]