This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bbe9be3326e3ef74d9ba04a3f069f66151286dc2 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Tue Mar 10 13:02:25 2020 +0800 KYLIN-4410 Limit the number of simultaneous rpcs to HBase for project --- .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../java/org/apache/kylin/common/QueryContext.java | 9 +++++ .../apache/kylin/rest/service/QueryService.java | 9 +++-- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 43 +++++++++++++++++++++- 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index dee57d1..3490b7a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1370,6 +1370,10 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-query", "400")); } + public int getHBaseMaxConnectionThreadsPerProject() { + return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-project", "800")); + } + // ============================================================================ // ENGINE.MR // ============================================================================ diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 30d410d..5d349c3 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -54,6 +54,7 @@ public class QueryContext { private final String queryId; private String username; + private String project; private Set<String> groups; private AtomicLong scannedRows = new AtomicLong(); private AtomicLong returnedRows = new AtomicLong(); @@ -122,6 +123,14 @@ public class QueryContext { this.username = username; } + public String getProject() { + return project; + } + + public void setProject(String project) { + this.project = project; + } + public Set<String> getGroups() { return groups; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index de101d9..de0c9bc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -404,7 +404,11 @@ public class QueryService extends BasicService { if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); + // set initial info when starting a query final QueryContext queryContext = QueryContextFacade.current(); + queryContext.setUsername(SecurityContextHolder.getContext().getAuthentication().getName()); + queryContext.setGroups(AclPermissionUtil.getCurrentUserGroups()); + queryContext.setProject(sqlRequest.getProject()); try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) { SQLResponse sqlResponse = null; @@ -629,10 +633,7 @@ public class QueryService extends BasicService { try { conn = QueryConnection.getConnection(sqlRequest.getProject()); - String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); - QueryContext context = QueryContextFacade.current(); - context.setUsername(userInfo); - context.setGroups(AclPermissionUtil.getCurrentUserGroups()); + String userInfo = QueryContextFacade.current().getUsername(); final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext() .getAuthentication().getAuthorities(); for (GrantedAuthority grantedAuthority : grantedAuthorities) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index dde4956..2eb6372 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryContext.CubeSegmentStatistics; +import org.apache.kylin.common.SubThreadPoolExecutor; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; @@ -57,7 +58,10 @@ import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTUtil; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; @@ -71,6 +75,9 @@ import org.apache.kylin.storage.hbase.util.HBaseUnionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.HBaseZeroCopyByteString; @@ -81,6 +88,38 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = new LoggableCachedThreadPool(); + private static LoadingCache<String, ExecutorService> projectThreadPoolMap = CacheBuilder.newBuilder() + .build(new CacheLoader<String, ExecutorService>() { + @Override + public ExecutorService load(String projName) throws Exception { + ExecutorService sharedPool = HBaseConnection.getCoprocessorPool(); + ProjectInstance projInst = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getProject(projName); + return new SubThreadPoolExecutor(sharedPool, "PROJECT", + projInst.getConfig().getHBaseMaxConnectionThreadsPerProject()); + } + }); + + private static class ProjectThreadPoolSyncListener extends Broadcaster.Listener { + @Override + public void onClearAll(Broadcaster broadcaster) throws IOException { + projectThreadPoolMap.invalidateAll(); + logger.info("Project level thread pools are cleared"); + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) + throws IOException { + projectThreadPoolMap.invalidate(cacheKey); + logger.info("Thread pool map for project {} is cleared", cacheKey); + } + } + + static { + Broadcaster.getInstance(KylinConfig.getInstanceFromEnv()) + .registerStaticListener(new ProjectThreadPoolSyncListener(), "project"); + } + public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { super(segment, cuboid, fullGTInfo, context); } @@ -258,8 +297,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { try { final Connection conn = HBaseUnionUtil.getConnection(cubeSeg.getConfig(), cubeSeg.getStorageLocationIdentifier()); + ExecutorService projThreadPool = projectThreadPoolMap + .get(queryContext.getProject().toUpperCase(Locale.ROOT)); final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), - queryContext.getConnectionPool(HBaseConnection.getCoprocessorPool())); + queryContext.getConnectionPool(projThreadPool)); table.coprocessorService(CubeVisitService.class, startKey, endKey, // new Batch.Call<CubeVisitService, CubeVisitResponse>() {