This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4a889f08ab94df6bccb5743cdc51d32a5d1b2b66 Author: Jiawei Li <1019037...@qq.com> AuthorDate: Thu Feb 16 17:37:19 2023 +0800 KYLIN-5526 fix unique queue async query count more than setting * KYLIN-5526 add check when add unique queue async query count --- .../apache/kylin/rest/controller/NAsyncQueryController.java | 8 ++++++-- .../java/org/apache/kylin/rest/service/QueryService.java | 13 +++++-------- .../org/apache/kylin/rest/util/AsyncQueryRequestLimits.java | 8 +++++++- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java index 8f84c63ddf..99a3f4ee7a 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java @@ -125,10 +125,11 @@ public class NAsyncQueryController extends NBasicController { if (StringUtils.isEmpty(sqlRequest.getSeparator())) { sqlRequest.setSeparator(","); } + AsyncQueryRequestLimits asyncQueryRequestLimits = null; if (NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) { - AsyncQueryRequestLimits.checkCount(); + asyncQueryRequestLimits = new AsyncQueryRequestLimits(); } - + AsyncQueryRequestLimits finalAsyncQueryRequestLimits = asyncQueryRequestLimits; executorService.submit(Objects.requireNonNull(TtlRunnable.get(() -> { String format = sqlRequest.getFormat().toLowerCase(Locale.ROOT); String encode = sqlRequest.getEncode().toLowerCase(Locale.ROOT); @@ -166,6 +167,9 @@ public class NAsyncQueryController extends NBasicController { throw new RuntimeException(e1); } } finally { + if (finalAsyncQueryRequestLimits != null) { + finalAsyncQueryRequestLimits.close(); + } logger.info("Async query with queryId: {} end", queryContext.getQueryId()); QueryContext.current().close(); } diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 846e7b6260..42fa8425b5 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -142,7 +142,6 @@ import org.apache.kylin.rest.response.TableMetaCacheResultV2; import org.apache.kylin.rest.security.MutableAclRecord; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclPermissionUtil; -import org.apache.kylin.rest.util.AsyncQueryRequestLimits; import org.apache.kylin.rest.util.PrepareSQLUtils; import org.apache.kylin.rest.util.QueryCacheSignatureUtil; import org.apache.kylin.rest.util.QueryRequestLimits; @@ -304,13 +303,11 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) { queryParams.setSparkQueue(sqlRequest.getSparkQueue()); } - try (AsyncQueryRequestLimits ignored = new AsyncQueryRequestLimits()) { - AsyncQueryJob asyncQueryJob = new AsyncQueryJob(); - asyncQueryJob.setProject(queryParams.getProject()); - asyncQueryJob.submit(queryParams); - return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(), - sqlRequest.getProject()); - } + AsyncQueryJob asyncQueryJob = new AsyncQueryJob(); + asyncQueryJob.setProject(queryParams.getProject()); + asyncQueryJob.submit(queryParams); + return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(), + sqlRequest.getProject()); } SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(queryParams.getSql()); diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java index ea3df6e9e4..382635f2aa 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java @@ -25,17 +25,23 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AsyncQueryRequestLimits implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(AsyncQueryRequestLimits.class); + private static volatile AtomicInteger asyncQueryCount = new AtomicInteger(0); private static final int MAX_COUNT = KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs(); - private static void openAsyncQueryRequest() { + private static synchronized void openAsyncQueryRequest() { if (MAX_COUNT <= 0) { return; } + checkCount(); asyncQueryCount.incrementAndGet(); + logger.debug("current async query job count is {}.", asyncQueryCount.get()); }