This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a692adf8c7064861206e3f1d282f9e06b72f68d1 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Mon May 25 19:21:58 2020 +0800 KYLIN-4513 Refactor QueryContext constructor --- .../java/org/apache/kylin/common/QueryContext.java | 40 +++++++++------------- .../apache/kylin/common/QueryContextFacade.java | 36 +++++++++++-------- .../java/org/apache/kylin/query/KylinTestBase.java | 6 ++-- .../apache/kylin/storage/hbase/ITStorageTest.java | 3 ++ .../kylin/query/security/QueryACLTestUtil.java | 9 +++-- .../apache/kylin/rest/service/QueryService.java | 23 ++++++++----- .../kylin/rest/metrics/QueryMetricsTest.java | 9 +++-- .../kylin/rest/service/QueryInfoCollectorTest.java | 16 ++++++--- .../kylin/rest/service/QueryServiceTest.java | 7 ++-- .../server/rest/controller/QueryController.java | 3 ++ 10 files changed, 89 insertions(+), 63 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 680a694..0f6534f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -30,11 +30,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.common.util.RandomUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Holds per query information and statistics. @@ -49,12 +48,14 @@ public class QueryContext { public interface QueryStopListener { void stop(QueryContext query); } - - private long queryStartMillis; - + private final String queryId; - private String username; - private String project; + private final String project; + private final String sql; + private final String username; + private final int maxConnThreads; + private final long queryStartMillis; + private Set<String> groups; private AtomicLong scannedRows = new AtomicLong(); private AtomicLong returnedRows = new AtomicLong(); @@ -70,18 +71,19 @@ public class QueryContext { private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList(); private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap(); - final int maxConnThreads; - private ExecutorService connPool; - QueryContext(int maxConnThreads) { - this(maxConnThreads, System.currentTimeMillis()); + QueryContext(String projectName, String sql, String user, int maxConnThreads) { + this(projectName, sql, user, maxConnThreads, System.currentTimeMillis()); } - QueryContext(int maxConnThreads, long startMills) { - queryId = RandomUtil.randomUUID().toString(); - queryStartMillis = startMills; + QueryContext(String projectName, String sql, String user, int maxConnThreads, long startMills) { + this.queryId = RandomUtil.randomUUID().toString(); + this.project = projectName; + this.sql = sql; + this.username = user; this.maxConnThreads = maxConnThreads; + this.queryStartMillis = startMills; } public ExecutorService getConnectionPool(ExecutorService sharedConnPool) { @@ -119,18 +121,10 @@ public class QueryContext { return username; } - public void setUsername(String username) { - this.username = username; - } - public String getProject() { return project; } - public void setProject(String project) { - this.project = project; - } - public Set<String> getGroups() { return groups; } diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java index e1cc1ec..293232f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java @@ -24,29 +24,24 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.threadlocal.InternalThreadLocal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.shaded.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class QueryContextFacade { private static final Logger logger = LoggerFactory.getLogger(QueryContextFacade.class); private static final ConcurrentMap<String, QueryContext> RUNNING_CTX_MAP = Maps.newConcurrentMap(); - private static final InternalThreadLocal<QueryContext> CURRENT_CTX = new InternalThreadLocal<QueryContext>() { - @Override - protected QueryContext initialValue() { - QueryContext queryContext = new QueryContext( - KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery()); - RUNNING_CTX_MAP.put(queryContext.getQueryId(), queryContext); - return queryContext; - } - }; + private static final InternalThreadLocal<QueryContext> CURRENT_CTX = new InternalThreadLocal<>(); public static QueryContext current() { - return CURRENT_CTX.get(); + QueryContext ret = CURRENT_CTX.get(); + if (ret == null) { + throw new RuntimeException("Query context hasn't been initialized!!!"); + } + return ret; } /** @@ -60,6 +55,17 @@ public class QueryContextFacade { } } + public static QueryContext startQuery(String project, String sql, String user) { + return startQuery(project, sql, user, KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery()); + } + + public static QueryContext startQuery(String project, String sql, String user, int maxHBaseConnectionThreads) { + QueryContext query = new QueryContext(project, sql, user, maxHBaseConnectionThreads); + CURRENT_CTX.set(query); + RUNNING_CTX_MAP.put(query.getQueryId(), query); + return query; + } + /** * invoked by user to let query stop early * @link resetCurrent() should be finally invoked @@ -97,8 +103,8 @@ public class QueryContextFacade { */ public static TreeSet<QueryContext> getLongRunningQueries(long runningTime) { SortedSet<QueryContext> allRunningQueries = getAllRunningQueries(); - QueryContext tmpCtx = new QueryContext(KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery(), - runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid + QueryContext tmpCtx = new QueryContext(null, null, null, + KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery(), runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid return (TreeSet<QueryContext>) allRunningQueries.headSet(tmpCtx); } } diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index c8dac65..98dacc2 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -240,7 +240,7 @@ public class KylinTestBase { protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) throws Exception { QueryContextFacade.resetCurrent(); - QueryContextFacade.current().setProject(ProjectInstance.DEFAULT_PROJECT_NAME); + QueryContextFacade.startQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "ADMIN"); // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -262,7 +262,7 @@ public class KylinTestBase { protected int executeQuery(String sql, boolean needDisplay) throws Exception { QueryContextFacade.resetCurrent(); - QueryContextFacade.current().setProject(ProjectInstance.DEFAULT_PROJECT_NAME); + QueryContextFacade.startQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "ADMIN"); // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -317,7 +317,7 @@ public class KylinTestBase { protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql, List<String> parameters, boolean needSort) throws Exception { QueryContextFacade.resetCurrent(); - QueryContextFacade.current().setProject(ProjectInstance.DEFAULT_PROJECT_NAME); + QueryContextFacade.startQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "ADMIN"); // change join type to match current setting sql = changeJoinType(sql, joinType); diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java index 83e58f5..32dc23d 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -35,6 +36,7 @@ import org.apache.kylin.metadata.model.DynamicFunctionDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.realization.SQLDigest.SQLCall; import org.apache.kylin.metadata.tuple.ITuple; @@ -91,6 +93,7 @@ public class ITStorageTest extends HBaseMetadataTestCase { List<FunctionDesc> aggregations = mockup.buildAggregations(); TupleFilter filter = mockup.buildFilter1(groups.get(0)); + QueryContextFacade.startQuery(ProjectInstance.DEFAULT_PROJECT_NAME, "", "ADMIN"); int count = search(groups, aggregations, filter, context); assertTrue(count >= 0); } diff --git a/query/src/main/java/org/apache/kylin/query/security/QueryACLTestUtil.java b/query/src/main/java/org/apache/kylin/query/security/QueryACLTestUtil.java index 071e7c7..dda86ca 100644 --- a/query/src/main/java/org/apache/kylin/query/security/QueryACLTestUtil.java +++ b/query/src/main/java/org/apache/kylin/query/security/QueryACLTestUtil.java @@ -18,15 +18,16 @@ package org.apache.kylin.query.security; -import org.apache.kylin.query.QueryConnection; -import org.apache.kylin.query.relnode.OLAPContext; - import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; import java.util.Map; +import org.apache.kylin.common.QueryContextFacade; +import org.apache.kylin.query.QueryConnection; +import org.apache.kylin.query.relnode.OLAPContext; + public class QueryACLTestUtil { public static void setUser(String username) { Map<String, String> auth = new HashMap<>(); @@ -35,6 +36,8 @@ public class QueryACLTestUtil { } public static void mockQuery(String project, String sql) throws SQLException { + QueryContextFacade.startQuery(project, sql, "ADMIN"); + Connection conn = null; Statement statement = null; try { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 5a832ff..8a00066 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -118,6 +118,9 @@ import org.apache.kylin.rest.util.AclPermissionUtil; import org.apache.kylin.rest.util.QueryRequestLimits; import org.apache.kylin.rest.util.SQLResponseSignatureUtil; import org.apache.kylin.rest.util.TableauInterceptor; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.apache.kylin.shaded.com.google.common.base.Strings; +import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.storage.hybrid.HybridInstance; import org.apache.kylin.storage.hybrid.HybridManager; import org.apache.kylin.storage.stream.StreamStorageQuery; @@ -132,9 +135,6 @@ import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kylin.shaded.com.google.common.base.Preconditions; -import org.apache.kylin.shaded.com.google.common.base.Strings; -import org.apache.kylin.shaded.com.google.common.collect.Lists; /** * @author xduo @@ -413,7 +413,8 @@ public class QueryService extends BasicService { } // project not found ProjectManager mgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()); - if (mgr.getProject(sqlRequest.getProject()) == null) { + ProjectInstance projectInstance = mgr.getProject(sqlRequest.getProject()); + if (projectInstance == null) { throw new BadRequestException( String.format(Locale.ROOT, msg.getPROJECT_NOT_FOUND(), sqlRequest.getProject())); } @@ -425,18 +426,22 @@ public class QueryService extends BasicService { BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); // set initial info when starting a query - final QueryContext queryContext = QueryContextFacade.current(); - queryContext.setUsername(SecurityContextHolder.getContext().getAuthentication().getName()); + String project = sqlRequest.getProject(); + String sql = sqlRequest.getSql(); + String userName = SecurityContextHolder.getContext().getAuthentication().getName(); + if (userName == null) { + userName = "unknown"; + } + final QueryContext queryContext = QueryContextFacade.startQuery(project, sql, userName, + projectInstance.getConfig().getHBaseMaxConnectionThreadsPerQuery()); queryContext.setGroups(AclPermissionUtil.getCurrentUserGroups()); - queryContext.setProject(sqlRequest.getProject()); try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) { // force clear the query context before a new query OLAPContext.clearThreadLocalContexts(); SQLResponse sqlResponse = null; - String sql = sqlRequest.getSql(); - String project = sqlRequest.getProject(); + boolean isQueryCacheEnabled = isQueryCacheEnabled(kylinConfig); logger.info("Using project: " + project); logger.info("The original query: " + sql); diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java index 8cd7489..3f9476e 100644 --- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java +++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java @@ -27,6 +27,7 @@ import javax.management.ObjectName; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryContextFacade; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.ServiceTestBase; @@ -118,11 +119,13 @@ public class QueryMetricsTest extends ServiceTestBase { System.setProperty("kylin.metrics.reporter-query-enabled", "true"); QueryMetricsFacade.init(); + String project = ProjectInstance.DEFAULT_PROJECT_NAME; + String sql = "select * from TEST_KYLIN_FACT"; SQLRequest sqlRequest = new SQLRequest(); - sqlRequest.setSql("select * from TEST_KYLIN_FACT"); - sqlRequest.setProject("default"); + sqlRequest.setSql(sql); + sqlRequest.setProject(project); - QueryContext context = QueryContextFacade.current(); + QueryContext context = QueryContextFacade.startQuery(project, sql, "ADMIN"); SQLResponse sqlResponse = new SQLResponse(); sqlResponse.setDuration(10); diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryInfoCollectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryInfoCollectorTest.java index 91cf190..dda15d4 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/QueryInfoCollectorTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/QueryInfoCollectorTest.java @@ -36,6 +36,7 @@ import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.query.QueryConnection; import org.apache.kylin.query.util.QueryInfoCollector; @@ -65,11 +66,14 @@ public class QueryInfoCollectorTest extends LocalFileMetadataTestCase { enableCube("ci_inner_join_cube", "ci_left_join_cube"); try { - connection = QueryConnection.getConnection("default"); + String project = ProjectInstance.DEFAULT_PROJECT_NAME; + connection = QueryConnection.getConnection(project); statement = connection.createStatement(); - String sql = "select count(*) as cnt1 from test_kylin_fact inner join test_account on seller_id = account_id\n" + - "union all\n" + - "select count(*) as cnt2 from test_kylin_fact left join test_account on seller_id = account_id"; + String sql = "select count(*) as cnt1 from test_kylin_fact inner join test_account on seller_id = account_id\n" + + "union all\n" + + "select count(*) as cnt2 from test_kylin_fact left join test_account on seller_id = account_id"; + + QueryContextFacade.startQuery(project, sql, "ADMIN"); resultSet = statement.executeQuery(sql); Assert.assertNotNull(resultSet); @@ -91,7 +95,7 @@ public class QueryInfoCollectorTest extends LocalFileMetadataTestCase { ExecutorService executorService = Executors.newSingleThreadExecutor(); try { - String project = "default"; + String project = ProjectInstance.DEFAULT_PROJECT_NAME; String expectedCube = "CUBE[name=ci_left_join_cube]"; String sqlWithCube = "select count(*) from test_kylin_fact"; @@ -142,6 +146,8 @@ public class QueryInfoCollectorTest extends LocalFileMetadataTestCase { Statement statement = connection.createStatement(); try { + QueryContextFacade.startQuery(project, sql, "ADMIN"); + statement.executeQuery(sql); return QueryInfoCollector.current().getCubeNameString(); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index acf9e19..38129b1 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -64,10 +64,13 @@ public class QueryServiceTest extends ServiceTestBase { // queryService.removeQuery(queryService.getQueries("ADMIN").get(0).getProperty("id")); // Assert.assertTrue(queryService.getQueries("ADMIN").size() == 0); + String project = ProjectInstance.DEFAULT_PROJECT_NAME; + String sql = "select * from test_table"; + SQLRequest request = new SQLRequest(); - request.setSql("select * from test_table"); + request.setSql(sql); request.setAcceptPartial(true); - QueryContext queryContext = QueryContextFacade.current(); + QueryContext queryContext = QueryContextFacade.startQuery(project, sql, "ADMIN"); SQLResponse response = new SQLResponse(); response.setHitExceptionCache(true); queryService.logQuery(queryContext.getQueryId(), request, response); diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/QueryController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/QueryController.java index 187fc25..2949ea0 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/QueryController.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/QueryController.java @@ -18,6 +18,7 @@ package org.apache.kylin.stream.server.rest.controller; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.stream.server.rest.exception.InternalErrorException; import org.apache.kylin.stream.server.rest.model.SQLRequest; @@ -71,6 +72,8 @@ public class QueryController extends BasicController { long startTime = System.currentTimeMillis(); SQLResponse sqlResponse; try { + QueryContextFacade.startQuery(project, sql, "system"); + sqlResponse = queryService.query(sqlRequest); sqlResponse.setDuration(System.currentTimeMillis() - startTime); logger.info(