lowka commented on code in PR #2192:
URL: https://github.com/apache/ignite-3/pull/2192#discussion_r1229388536
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -374,121 +389,190 @@ private void registerIndexListener(IndexEvent evt,
AbstractIndexEventListener ls
private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
SessionId sessionId,
- QueryContext context,
+ QueryContext queryContext,
String sql,
Object... params
) {
Session session = sessionManager.session(sessionId);
if (session == null) {
- return CompletableFuture.failedFuture(
+ return failedFuture(
new SqlException(SESSION_NOT_FOUND_ERR, format("Session
not found [{}]", sessionId)));
}
+ QueryCancel queryCancel;
+
+ try {
+ queryCancel = createQueryCancel(session);
+ } catch (IllegalStateException ex) {
+ return failedFuture(new
IgniteInternalException(SESSION_EXPIRED_ERR,
+ format("Session has been expired [{}]",
session.sessionId()), ex));
+ }
+
String schemaName =
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
- InternalTransaction outerTx =
context.unwrap(InternalTransaction.class);
+ InternalTransaction outerTx =
queryContext.unwrap(InternalTransaction.class);
+ long timestamp = outerTx == null ? clock.nowLong() :
outerTx.startTimestamp().longValue();
- QueryCancel queryCancel = new QueryCancel();
+ boolean implicitTx = outerTx == null;
- AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
- queryCancel::cancel,
- taskExecutor
- );
+ // TODO: IGNITE-19497 Switch to Catalog manager and uncomment next
lines.
+ // int plannerCatalogVersion =
catalogManager.activeCatalogVersion(timestamp);
+ // int schemaId = catalogManager.schema(plannerCatalogVersion).id();
+ // SchemaPlus schema = sqlSchemaManager.schema(schemaName,
plannerCatalogVersion);
- queryCancel.add(() -> session.unregisterResource(closeableResource));
+ int plannerCatalogVersion = 0;
+ SchemaPlus schema = sqlSchemaManager.schema(schemaName);
- try {
- session.registerResource(closeableResource);
- } catch (IllegalStateException ex) {
- return CompletableFuture.failedFuture(new
IgniteInternalException(SESSION_EXPIRED_ERR,
- format("Session has been expired [{}]",
session.sessionId()), ex));
+ if (schema == null) {
+ return failedFuture(new SchemaNotFoundException(schemaName));
}
- CompletableFuture<Void> start = new CompletableFuture<>();
+ CacheKey cacheKey = new CacheKey(plannerCatalogVersion, schemaName,
sql, params);
+
+ BaseQueryContext plannerContext = BaseQueryContext.builder()
+
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+ .logger(LOG)
+ .cancel(queryCancel)
+ .parameters(params)
+ .plannerTimeout(PLANNER_TIMEOUT)
+ .build();
+
+ CompletableFuture<QueryPlan> planFuture = queryCache.get(cacheKey);
+
+ if (planFuture == null) {
+ planFuture = CompletableFuture.supplyAsync(() -> {
Review Comment:
I think it would be better to specify an executor here since
`CompletableFuture.supplyAsync(task)` uses ForkJoin's common pool.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]