http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 9afec83,17f6b58..9789b70 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@@ -102,11 -98,8 +102,12 @@@ import org.apache.kylin.rest.request.Pr import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; +import org.apache.kylin.rest.util.AclPermissionUtil; + import org.apache.kylin.rest.util.QueryRequestLimits; import org.apache.kylin.rest.util.TableauInterceptor; +import org.apache.kylin.shaded.htrace.org.apache.htrace.Sampler; +import org.apache.kylin.shaded.htrace.org.apache.htrace.Trace; +import org.apache.kylin.shaded.htrace.org.apache.htrace.TraceScope; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -403,163 -403,121 +404,176 @@@ public class QueryService extends Basic final QueryContext queryContext = QueryContext.current(); + TraceScope scope = null; - if (KylinConfig.getInstanceFromEnv().isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) { ++ if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) { + logger.info("Current query is under tracing"); + HtraceInit.init(); + scope = Trace.startSpan("query life cycle for " + queryContext.getQueryId(), Sampler.ALWAYS); + } + String traceUrl = getTraceUrl(scope); + try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) { - ++ long startTime = System.currentTimeMillis(); ++ ++ SQLResponse sqlResponse = null; String sql = sqlRequest.getSql(); String project = sqlRequest.getProject(); ++ boolean isQueryCacheEnabled = isQueryCacheEnabled(kylinConfig); logger.info("Using project: " + project); logger.info("The original query: " + sql); - final boolean isSelect = QueryUtil.isSelectStatement(sql); - final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled() - && kylinConfig.isPushDownUpdateEnabled(); - final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject(); + sql = QueryUtil.removeCommentInSql(sql); - if (!isSelect && !isPushDownUpdateEnabled) { - logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); - throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); - } - - SQLResponse sqlResponse = null; + Pair<Boolean, String> result = TempStatementUtil.handleTempStatement(sql, kylinConfig); - - boolean isTempStatement = result.getFirst(); ++ boolean isCreateTempStatement = result.getFirst(); + sql = result.getSecond(); + sqlRequest.setSql(sql); - final boolean isSelect = QueryUtil.isSelectStatement(sql); - - long startTime = System.currentTimeMillis(); - - SQLResponse sqlResponse = null; - boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), - "query cache disabled in KylinConfig") && // - checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); - - if (queryCacheEnabled) { - try { - // Check project level query request concurrency limitation per query server - if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) { - logger.warn( - "Directly return exception as too many concurrent query requests for project:" + project); - throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); ++ // try some cheap executions ++ if (sqlResponse == null && isQueryInspect) { ++ sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql()); ++ } ++ ++ if (sqlResponse == null && isCreateTempStatement) { ++ sqlResponse = new SQLResponse(null, null, 0, false, null); ++ } ++ ++ if (sqlResponse == null && isQueryCacheEnabled) { + sqlResponse = searchQueryInCache(sqlRequest); + Trace.addTimelineAnnotation("query cache searched"); - } else { - Trace.addTimelineAnnotation("query cache skip search"); + } - - try { - if (null == sqlResponse) { - if (isQueryInspect) { - // set query sql to exception message string - sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql()); - } else if (isTempStatement) { - sqlResponse = new SQLResponse(null, null, 0, false, null); - } else if (isSelect) { - sqlResponse = query(sqlRequest); - Trace.addTimelineAnnotation("query almost done"); - } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) { - sqlResponse = update(sqlRequest); - Trace.addTimelineAnnotation("update query almost done"); - } else { - logger.debug( - "Directly return exception as the sql is unsupported, and query pushdown is disabled"); - throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); - } - - long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); - long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); - long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // - String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), - String.valueOf(sqlResponse.getTotalScanCount())); - if (checkCondition(queryCacheEnabled, "query cache is disabled") // - && checkCondition(!sqlResponse.getIsException(), "query has exception") // - && checkCondition(!(sqlResponse.isPushDown() - && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), - "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // - && checkCondition( - sqlResponse.getDuration() > durationThreshold - || sqlResponse.getTotalScanCount() > scanCountThreshold - || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // - "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", - sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), - scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) - && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), - "query response is too large: {} ({})", sqlResponse.getResults().size(), - kylinConfig.getLargeQueryThreshold())) { - cacheManager.getCache(SUCCESS_QUERY_CACHE) - .put(new Element(sqlRequest.getCacheKey(), sqlResponse)); - } - Trace.addTimelineAnnotation("response from execution"); - - } else { - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - sqlResponse.setTotalScanCount(0); - sqlResponse.setTotalScanBytes(0); - Trace.addTimelineAnnotation("response from cache"); ++ ++ // real execution if required ++ if (sqlResponse == null) { ++ try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) { ++ sqlResponse = queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled); } ++ } else { ++ Trace.addTimelineAnnotation("response without real execution"); ++ } - long startTime = System.currentTimeMillis(); - - // force clear the query context before a new query - OLAPContext.clearThreadLocalContexts(); ++ // check authorization before return, since the response may come from cache ++ if (!sqlResponse.getIsException()) + checkQueryAuth(sqlResponse, project); - } catch (Throwable e) { // calcite may throw AssertError - logger.error("Exception while executing query", e); - String errMsg = makeErrorMsgUserFriendly(e); - - sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false); - sqlResponse.setTotalScanCount(queryContext.getScannedRows()); - sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); - - if (queryCacheEnabled && e.getCause() != null - && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { - Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); - exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); - boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), - "query cache disabled in KylinConfig") && // - checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); - if (queryCacheEnabled) { - sqlResponse = searchQueryInCache(sqlRequest); -- } - Trace.addTimelineAnnotation("error response"); - } - ++ sqlResponse.setDuration(System.currentTimeMillis() - startTime); + sqlResponse.setTraceUrl(traceUrl); + logQuery(sqlRequest, sqlResponse); + try { + recordMetric(sqlRequest, sqlResponse); + } catch (Throwable th) { + logger.warn("Write metric error.", th); + } + if (sqlResponse.getIsException()) + throw new InternalErrorException(sqlResponse.getExceptionMessage()); - try { - if (null == sqlResponse) { - if (isSelect) { - sqlResponse = query(sqlRequest); - } else if (isPushDownUpdateEnabled) { - sqlResponse = update(sqlRequest); - } + return sqlResponse; - long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); - long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); - long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // - String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), - String.valueOf(sqlResponse.getTotalScanCount())); - if (checkCondition(queryCacheEnabled, "query cache is disabled") // - && checkCondition(!sqlResponse.getIsException(), "query has exception") // - && checkCondition(!(sqlResponse.isPushDown() - && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), - "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // - && checkCondition( - sqlResponse.getDuration() > durationThreshold - || sqlResponse.getTotalScanCount() > scanCountThreshold - || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // - "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", - sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), - scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) - && checkCondition( - sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), - "query response is too large: {} ({})", sqlResponse.getResults().size(), - kylinConfig.getLargeQueryThreshold())) { - cacheManager.getCache(SUCCESS_QUERY_CACHE) - .put(new Element(sqlRequest.getCacheKey(), sqlResponse)); - } + } finally { + BackdoorToggles.cleanToggles(); + QueryContext.reset(); + if (scope != null) { + scope.close(); + } + } + } - } else { - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - sqlResponse.setTotalScanCount(0); - sqlResponse.setTotalScanBytes(0); - } ++ private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) { ++ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); ++ Message msg = MsgPicker.getMsg(); ++ ++ SQLResponse sqlResponse = null; ++ try { ++ final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql()); ++ if (isSelect) { ++ sqlResponse = query(sqlRequest); ++ Trace.addTimelineAnnotation("query almost done"); ++ } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) { ++ sqlResponse = update(sqlRequest); ++ Trace.addTimelineAnnotation("update query almost done"); ++ } else { ++ logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); ++ throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); ++ } + - checkQueryAuth(sqlResponse, project, secureEnabled); ++ long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); ++ long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); ++ long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); ++ sqlResponse.setDuration(System.currentTimeMillis() - startTime); ++ logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // ++ String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), ++ String.valueOf(sqlResponse.getTotalScanCount())); ++ if (checkCondition(queryCacheEnabled, "query cache is disabled") // ++ && checkCondition(!sqlResponse.getIsException(), "query has exception") // ++ && checkCondition( ++ !(sqlResponse.isPushDown() ++ && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), ++ "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // ++ && checkCondition( ++ sqlResponse.getDuration() > durationThreshold ++ || sqlResponse.getTotalScanCount() > scanCountThreshold ++ || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // ++ "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", ++ sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), ++ scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) ++ && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), ++ "query response is too large: {} ({})", sqlResponse.getResults().size(), ++ kylinConfig.getLargeQueryThreshold())) { ++ cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest.getCacheKey(), sqlResponse)); ++ } ++ Trace.addTimelineAnnotation("response from execution"); + - } catch (Throwable e) { // calcite may throw AssertError - logger.error("Exception while executing query", e); - String errMsg = makeErrorMsgUserFriendly(e); ++ } catch (Throwable e) { // calcite may throw AssertError ++ logger.error("Exception while executing query", e); ++ String errMsg = makeErrorMsgUserFriendly(e); + - sqlResponse = new SQLResponse(null, null, 0, true, errMsg); - sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); - sqlResponse.setTotalScanCount(queryContext.getScannedRows()); - sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); - sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); ++ sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false); ++ QueryContext queryContext = QueryContext.current(); ++ sqlResponse.setTotalScanCount(queryContext.getScannedRows()); ++ sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); + - if (queryCacheEnabled && e.getCause() != null - && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { - Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); - exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); - } - } - } finally { - QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery); ++ if (queryCacheEnabled && e.getCause() != null ++ && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { ++ Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); ++ exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); + } ++ Trace.addTimelineAnnotation("error response"); ++ } ++ return sqlResponse; ++ } + - logQuery(sqlRequest, sqlResponse); ++ private boolean isQueryCacheEnabled(KylinConfig kylinConfig) { ++ return checkCondition(kylinConfig.isQueryCacheEnabled(), ++ "query cache disabled in KylinConfig") && // ++ checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); ++ } + - QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); - QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse); + protected void recordMetric(SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException { + QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); + QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse); + } - if (sqlResponse.getIsException()) - throw new InternalErrorException(sqlResponse.getExceptionMessage()); + private String getTraceUrl(TraceScope scope) { + if (scope == null) { + return null; + } - return sqlResponse; + String hostname = System.getProperty("zipkin.collector-hostname"); + if (StringUtils.isEmpty(hostname)) { + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + logger.debug("failed to get trace url due to " + e.getMessage()); + return null; + } + } - } finally { - BackdoorToggles.cleanToggles(); - QueryContext.reset(); + String port = System.getProperty("zipkin.web-ui-port"); + if (StringUtils.isEmpty(port)) { + port = "9411"; } + + return "http://" + hostname + ":" + port + "/zipkin/traces/" + Long.toHexString(scope.getSpan().getTraceId()); } private String getUserName() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index d218bc2,901ac46..8a6dd96 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@@ -111,12 -115,27 +111,11 @@@ public class TableService extends Basic public String[] loadHiveTablesToProject(String[] tables, String project) throws Exception { aclEvaluate.checkProjectAdminPermission(project); - // de-dup - SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); - for (String fullTableName : tables) { - String[] parts = HadoopUtil.parseHiveTableName(fullTableName); - db2tables.put(parts[0], parts[1]); - } - - // load all tables first - List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList(); - ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); - for (Map.Entry<String, String> entry : db2tables.entries()) { - Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project); - TableDesc tableDesc = pair.getFirst(); - Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase())); - Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase())); - Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry.getValue().toUpperCase())); - TableExtDesc extDesc = pair.getSecond(); - Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity())); - allMeta.add(pair); - } + List<Pair<TableDesc, TableExtDesc>> allMeta = getAllMeta(tables, project); + return loadHiveTablesToProject(project, allMeta); + } + String[] loadHiveTablesToProject(String project, List<Pair<TableDesc, TableExtDesc>> allMeta) throws Exception { - // do schema check TableMetadataManager metaMgr = getTableManager(); CubeManager cubeMgr = getCubeManager(); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java index 0000000,cddaa12..eb8eee8 mode 000000,100644..100644 --- a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java +++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java @@@ -1,0 -1,90 +1,121 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.rest.util; + + import java.util.concurrent.ExecutionException; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + ++import org.apache.kylin.common.KylinConfig; ++import org.apache.kylin.metadata.project.ProjectInstance; ++import org.apache.kylin.metadata.project.ProjectManager; ++import org.apache.kylin.rest.exception.BadRequestException; ++import org.apache.kylin.rest.msg.Message; ++import org.apache.kylin.rest.msg.MsgPicker; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.google.common.cache.CacheBuilder; + import com.google.common.cache.CacheLoader; + import com.google.common.cache.LoadingCache; + import com.google.common.cache.RemovalListener; + import com.google.common.cache.RemovalNotification; + -public class QueryRequestLimits { ++public class QueryRequestLimits implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class); + + private static LoadingCache<String, AtomicInteger> runningStats = CacheBuilder.newBuilder() + .removalListener(new RemovalListener<String, AtomicInteger>() { + @Override + public void onRemoval(RemovalNotification<String, AtomicInteger> notification) { + logger.info("Current running query number " + notification.getValue().get() + " for project " + + notification.getKey() + " is removed due to " + notification.getCause()); + } + }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() { + @Override + public AtomicInteger load(String s) throws Exception { + return new AtomicInteger(0); + } + }); + - public static boolean openQueryRequest(String project, int maxConcurrentQuery) { ++ static boolean openQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return true; + } + try { + AtomicInteger nRunningQueries = runningStats.get(project); + for (;;) { + int nRunning = nRunningQueries.get(); + if (nRunning < maxConcurrentQuery) { + if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { + return true; + } + } else { + return false; + } + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + - public static void closeQueryRequest(String project, int maxConcurrentQuery) { ++ static void closeQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return; + } + AtomicInteger nRunningQueries = runningStats.getIfPresent(project); + if (nRunningQueries != null) { + nRunningQueries.decrementAndGet(); + } + } + + public static Integer getCurrentRunningQuery(String project) { + AtomicInteger nRunningQueries = runningStats.getIfPresent(project); + if (nRunningQueries != null) { + return nRunningQueries.get(); + } else { + return null; + } + } ++ ++ // ============================================================================ ++ ++ final private String project; ++ final private int maxConcurrentQuery; ++ ++ public QueryRequestLimits(String project) { ++ this.project = project; ++ ++ ProjectManager mgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()); ++ ProjectInstance prj = mgr.getProject(project); ++ this.maxConcurrentQuery = prj.getConfig().getQueryConcurrentRunningThresholdForProject(); ++ ++ boolean ok = openQueryRequest(project, maxConcurrentQuery); ++ if (!ok) { ++ Message msg = MsgPicker.getMsg(); ++ logger.warn("Directly return exception as too many concurrent query requests for project:" + project); ++ throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); ++ } ++ } ++ ++ @Override ++ public void close() { ++ closeQueryRequest(project, maxConcurrentQuery); ++ } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java index ada78cd,7e513ae..be20419 --- a/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java +++ b/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java @@@ -169,4 -108,31 +169,5 @@@ public class ValidateUtil } return cols; } + - public Set<String> getAllUsers(String project) throws IOException { - Set<String> allUsers = new HashSet<>(); - // add users that is global admin - for (ManagedUser managedUser : userService.listUsers()) { - if (managedUser.getAuthorities().contains(new SimpleGrantedAuthority(Constant.ROLE_ADMIN))) { - allUsers.add(managedUser.getUsername()); - } - } - - // add users that has project permission - ProjectInstance prj = projectService.getProjectManager().getProject(project); - AclEntity ae = accessService.getAclEntity("ProjectInstance", prj.getUuid()); - Acl acl = accessService.getAcl(ae); - if (acl != null && acl.getEntries() != null) { - for (AccessControlEntry ace : acl.getEntries()) { - allUsers.add(((PrincipalSid) ace.getSid()).getPrincipal()); - } - } - return allUsers; - } - - public void validateArgs(String... args) { - for (String arg : args) { - Preconditions.checkState(!StringUtils.isEmpty(arg)); - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/main/resources/kylinSecurity.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java ---------------------------------------------------------------------- diff --cc server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java index fc97eff,414a241..528b18e --- a/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java +++ b/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java @@@ -23,10 -23,9 +23,10 @@@ import java.sql.SQLException import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.acl.TableACLManager; import org.apache.kylin.query.security.AccessDeniedException; - import org.apache.kylin.query.security.QuerACLTestUtil; + import org.apache.kylin.query.security.QueryACLTestUtil; import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Before; @@@ -59,26 -55,26 +59,26 @@@ public class QueryWithTableACLTest exte @Test public void testFailQuery() throws SQLException, IOException { - QuerACLTestUtil.setUser(MODELER); - QuerACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE"); + QueryACLTestUtil.setUser(MODELER); + QueryACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE"); - QuerACLTestUtil.setUser(ADMIN); + QueryACLTestUtil.setUser(ADMIN); - TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE); + TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE, MetadataConstants.TYPE_USER); thrown.expectCause(CoreMatchers.isA(AccessDeniedException.class)); thrown.expectMessage(CoreMatchers.containsString("Query failed.Access table:DEFAULT.STREAMING_TABLE denied")); - QuerACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE"); + QueryACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE"); } @Test public void testFailQueryWithCountStar() throws SQLException, IOException { - QuerACLTestUtil.setUser(MODELER); - QuerACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE"); + QueryACLTestUtil.setUser(MODELER); + QueryACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE"); - QuerACLTestUtil.setUser(ADMIN); + QueryACLTestUtil.setUser(ADMIN); - TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE); + TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE, MetadataConstants.TYPE_USER); thrown.expectCause(CoreMatchers.isA(AccessDeniedException.class)); thrown.expectMessage(CoreMatchers.containsString("Query failed.Access table:DEFAULT.STREAMING_TABLE denied")); - QuerACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE"); + QueryACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE"); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java ---------------------------------------------------------------------- diff --cc server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java index 75b8f21,0000000..3ee14e0 mode 100644,000000..100644 --- a/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java @@@ -1,72 -1,0 +1,62 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.apache.kylin.common.KylinConfig; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; + +/** + * + */ +public class AdminServiceTest extends ServiceTestBase { + + @Autowired + @Qualifier("adminService") + private AdminService adminService; + + @Test + public void testGetPublicConfig() throws IOException { + //set ../examples/test_metadata/kylin.properties empty + File file = FileUtils.getFile(LOCALMETA_TEMP_DATA + "kylin.properties"); + FileUtils.deleteQuietly(file); + FileUtils.touch(file); + String path = Thread.currentThread().getContextClassLoader().getResource("kylin.properties").getPath(); + KylinConfig.setKylinConfigThreadLocal(KylinConfig.createInstanceFromUri(path)); - String expected = "kylin.web.link-streaming-guide=http://kylin.apache.org/\n" + - "kylin.web.contact-mail=\n" + - "kylin.query.cache-enabled=true\n" + - "kylin.web.link-diagnostic=\n" + - "kylin.web.help.length=4\n" + - "kylin.web.timezone=GMT+8\n" + - "kylin.server.external-acl-provider=\n" + - "kylin.storage.default=2\n" + - "kylin.web.help=\n" + - "kylin.web.export-allow-other=true\n" + - "kylin.web.link-hadoop=\n" + - "kylin.web.hide-measures=\n" + - "kylin.htrace.show-gui-trace-toggle=false\n" + - "kylin.web.export-allow-admin=true\n" + - "kylin.env=QA\n" + - "kylin.web.hive-limit=20\n" + - "kylin.engine.default=2\n" + - "kylin.web.help.3=onboard|Cube Design Tutorial|\n" + - "kylin.web.help.2=tableau|Tableau Guide|\n" + - "kylin.web.help.1=odbc|ODBC Driver|\n" + - "kylin.web.help.0=start|Getting Started|\n" + - "kylin.security.profile=testing\n"; - Assert.assertEquals(expected, adminService.getPublicConfig()); ++ ++ String publicConfig = adminService.getPublicConfig(); ++ ++ Assert.assertFalse(publicConfig.contains("kylin.metadata.data-model-manager-impl")); ++ Assert.assertFalse(publicConfig.contains("kylin.dictionary.use-forest-trie")); ++ Assert.assertFalse(publicConfig.contains("kylin.cube.segment-advisor")); ++ Assert.assertFalse(publicConfig.contains("kylin.job.use-remote-cli")); ++ Assert.assertFalse(publicConfig.contains("kylin.job.scheduler.provider")); ++ Assert.assertFalse(publicConfig.contains("kylin.engine.mr.job-jar")); ++ Assert.assertFalse(publicConfig.contains("kylin.engine.spark.sanity-check-enabled")); ++ Assert.assertFalse(publicConfig.contains("kylin.storage.provider")); ++ Assert.assertFalse(publicConfig.contains("kylin.query.convert-create-table-to-with")); ++ Assert.assertFalse(publicConfig.contains("kylin.server.init-tasks")); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --cc source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 0b23121,c700d82..e4564d0 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@@ -223,8 -224,10 +224,10 @@@ public class HiveMRInput implements IMR .append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n"); createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + intermediate + "';\n"); createIntermediateTableHql + .append("ALTER TABLE " + intermediate + " SET TBLPROPERTIES('auto.purge'='true');\n"); + createIntermediateTableHql - .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n"); - hiveCmdBuilder.addStatement(createIntermediateTableHql.toString()); + .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + "\n"); + hiveCmdBuilder.addStatementWithRedistributeBy(createIntermediateTableHql); hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";"; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java ---------------------------------------------------------------------- diff --cc source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java index 59780e6,d05f14e..7fc26de --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java @@@ -133,7 -134,7 +134,7 @@@ public class JdbcHiveMRInput extends Hi if (partitionDesc.isPartitioned()) { partCol = partitionDesc.getPartitionDateColumn();//tablename.colname partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc, - flatDesc.getSegment(), flatDesc.getSegRange()); - flatDesc.getSegRange()); ++ flatDesc.getSegment(), flatDesc.getSegRange()); } String splitTable; http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java ---------------------------------------------------------------------- diff --cc source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java index 37bf8ff,fe5812b..c045ff7 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java @@@ -63,20 -63,20 +63,20 @@@ public class MergeOffsetStep extends Ab final CubeSegment first = mergingSegs.get(0); final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1); - segment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); - segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); - segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); + segCopy.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); + segCopy.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); + segCopy.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); - segment.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd())); + segCopy.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd())); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); try { - cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(); + cubeManager.updateCube(update); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); ++ return ExecuteResult.createSucceed(); } catch (IOException e) { logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + return ExecuteResult.createError(e); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 073f183,838112f..68aa172 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@@ -28,9 -30,14 +29,9 @@@ import org.apache.commons.cli.Options import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; @@@ -71,6 -79,8 +73,7 @@@ public class CreateHTableJob extends Ab options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_PARTITION_FILE_PATH); - options.addOption(OPTION_STATISTICS_ENABLED); + options.addOption(OPTION_CUBOID_MODE); parseOptions(options, args); partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); @@@ -81,12 -92,31 +84,29 @@@ cubeDesc = cube.getDescriptor(); kylinConfig = cube.getConfig(); segmentID = getOptionValue(OPTION_SEGMENT_ID); + cuboidModeName = getOptionValue(OPTION_CUBOID_MODE); CubeSegment cubeSegment = cube.getSegmentById(segmentID); - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - byte[][] splitKeys; - final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap(); - if (statsEnabled) { - Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, null, kylinConfig).getCuboidSizeMap(); - Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName); - if (buildingCuboids != null && !buildingCuboids.isEmpty()) { - Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size()); - for (Long cuboid : buildingCuboids) { - Double cuboidSize = cuboidSizeMap.get(cuboid); - if (cuboidSize == null) { - logger.warn(cuboid + "cuboid's size is null will replace by 0"); - cuboidSize = 0.0; - } - optimizedCuboidSizeMap.put(cuboid, cuboidSize); ++ Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap(); ++ ++ // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled ++ Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName); ++ if (buildingCuboids != null && !buildingCuboids.isEmpty()) { ++ Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size()); ++ for (Long cuboid : buildingCuboids) { ++ Double cuboidSize = cuboidSizeMap.get(cuboid); ++ if (cuboidSize == null) { ++ logger.warn(cuboid + "cuboid's size is null will replace by 0"); ++ cuboidSize = 0.0; + } - cuboidSizeMap = optimizedCuboidSizeMap; ++ optimizedCuboidSizeMap.put(cuboid, cuboidSize); + } - splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent()); - } else { - splitKeys = getRegionSplits(conf, partitionFilePath); ++ cuboidSizeMap = optimizedCuboidSizeMap; + } ++ + splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, + partitionFilePath.getParent()); CubeHTableUtil.createHTable(cubeSegment, splitKeys); return 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index 06c0923,8817cb2..ea372d9 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@@ -185,4 -199,30 +199,30 @@@ public class HBaseMROutput2Transition i throw new IllegalStateException("No merging segment's last build job ID equals " + jobID); } } + + public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) { + return new IMRBatchOptimizeOutputSide2() { + HBaseMRSteps steps = new HBaseMRSteps(seg); + + @Override + public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId(), CuboidModeEnum.RECOMMEND)); ++ jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId(), CuboidModeEnum.RECOMMEND)); + } + + @Override + public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); + } + + public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { + steps.addOptimizeGarbageCollectionSteps(jobFlow); + } + + @Override + public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) { + steps.addCheckpointGarbageCollectionSteps(jobFlow); + } + }; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 3dd61d8,67c94ad..99cc9a7 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@@ -74,14 -76,31 +76,18 @@@ public class HBaseMRSteps extends JobBu return rowkeyDistributionStep; } - public HadoopShellExecutable createCreateHTableStep(String jobId) { - return createCreateHTableStep(jobId, false); ++ return createCreateHTableStep(jobId, CuboidModeEnum.CURRENT); + } - - public HadoopShellExecutable createCreateHTableStepWithStats(String jobId) { - return createCreateHTableStep(jobId, true); - } - - public HadoopShellExecutable createCreateHTableStepWithStats(String jobId, CuboidModeEnum cuboidMode) { - return createCreateHTableStep(jobId, true, cuboidMode); - } - - private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) { - return createCreateHTableStep(jobId, withStats, CuboidModeEnum.CURRENT); - } - - private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats, CuboidModeEnum cuboidMode) { ++ ++ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum cuboidMode) { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); - appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString()); createHtableStep.setJobParams(cmd.toString()); createHtableStep.setJobClass(CreateHTableJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java ---------------------------------------------------------------------- diff --cc tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java index 0000000,2be381c..666d23a mode 000000,100644..100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java @@@ -1,0 -1,673 +1,656 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.tool.metrics.systemcube; + -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; + import java.util.List; + import java.util.Map; + import java.util.Set; + + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeDescManager; + import org.apache.kylin.cube.model.AggregationGroup; + import org.apache.kylin.cube.model.CubeDesc; + import org.apache.kylin.cube.model.DimensionDesc; + import org.apache.kylin.cube.model.HBaseColumnDesc; + import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; + import org.apache.kylin.cube.model.HBaseMappingDesc; + import org.apache.kylin.cube.model.RowKeyColDesc; + import org.apache.kylin.cube.model.RowKeyDesc; + import org.apache.kylin.cube.model.SelectRule; + import org.apache.kylin.dimension.DictionaryDimEnc; + import org.apache.kylin.job.constant.JobStatusEnum; + import org.apache.kylin.measure.percentile.PercentileMeasureType; + import org.apache.kylin.metadata.model.FunctionDesc; + import org.apache.kylin.metadata.model.IEngineAware; + import org.apache.kylin.metadata.model.MeasureDesc; + import org.apache.kylin.metadata.model.ParameterDesc; + import org.apache.kylin.metrics.lib.SinkTool; + import org.apache.kylin.metrics.lib.impl.RecordEvent; + import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; + import org.apache.kylin.metrics.property.JobPropertyEnum; + import org.apache.kylin.metrics.property.QueryCubePropertyEnum; + import org.apache.kylin.metrics.property.QueryPropertyEnum; + import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; -import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; + + import com.google.common.collect.Lists; + import com.google.common.collect.Maps; + import com.google.common.collect.Sets; + + public class CubeDescCreator { + - public static void main(String[] args) throws Exception { - // KylinConfig.setSandboxEnvIfPossible(); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - - CubeDesc kylinCubeDesc = generateKylinCubeDescForMetricsQuery(config, new HiveSinkTool()); - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(buf); - CubeDescManager.CUBE_DESC_SERIALIZER.serialize(kylinCubeDesc, dout); - dout.close(); - buf.close(); - System.out.println(buf.toString()); - } - + public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig config, SinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()); + + //Set for dimensions + List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery(); + dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); + + List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size()); + for (String dimensionName : dimensions) { + dimensionDescList.add(getDimensionDesc(tableName, dimensionName)); + } + + //Set for measures + List<String> measures = ModelCreator.getMeasuresForMetricsQuery(); + measures.remove(QueryPropertyEnum.ID_CODE.toString()); + List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1); + + List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuery(); + Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size()); + for (Pair<String, String> entry : measureTypeList) { + measureTypeMap.put(entry.getKey(), entry.getValue()); + } + measureDescList.add(getMeasureCount()); + measureDescList.add(getMeasureMin(QueryPropertyEnum.TIME_COST.toString(), + measureTypeMap.get(QueryPropertyEnum.TIME_COST.toString()))); + for (String measure : measures) { + measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure))); + measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); + } + measureDescList.add(getMeasureHLL(QueryPropertyEnum.ID_CODE.toString())); + measureDescList.add(getMeasurePercentile(QueryPropertyEnum.TIME_COST.toString())); + + //Set for row key + RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; + int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.USER.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.PROJECT.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION_TYPE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.EXCEPTION.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.TYPE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1); + idx++; + + RowKeyDesc rowKeyDesc = new RowKeyDesc(); + rowKeyDesc.setRowkeyColumns(rowKeyColDescs); + + //Set for aggregation group + String[][] hierarchy_dims = new String[2][]; + hierarchy_dims[0] = getTimeHierarchy(); + hierarchy_dims[1] = new String[2]; + hierarchy_dims[1][0] = QueryPropertyEnum.REALIZATION_TYPE.toString(); + hierarchy_dims[1][1] = QueryPropertyEnum.REALIZATION.toString(); + for (int i = 0; i < hierarchy_dims.length; i++) { + hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); + } + + SelectRule selectRule = new SelectRule(); + selectRule.mandatoryDims = new String[0]; + selectRule.hierarchyDims = hierarchy_dims; + selectRule.jointDims = new String[0][0]; + + AggregationGroup aggGroup = new AggregationGroup(); + aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions)); + aggGroup.setSelectRule(selectRule); + + //Set for hbase mapping + HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); + hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); + + return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + } + + public static CubeDesc generateKylinCubeDescForMetricsQueryCube(KylinConfig config, SinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()); + + //Set for dimensions + List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryCube(); + dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString()); + dimensions.remove(QueryCubePropertyEnum.PROJECT.toString()); + + List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size()); + for (String dimensionName : dimensions) { + dimensionDescList.add(getDimensionDesc(tableName, dimensionName)); + } + + //Set for measures + List<String> measures = ModelCreator.getMeasuresForMetricsQueryCube(); + List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2); + + List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryCube(); + Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size()); + for (Pair<String, String> entry : measureTypeList) { + measureTypeMap.put(entry.getKey(), entry.getValue()); + } + measureDescList.add(getMeasureCount()); + for (String measure : measures) { + measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure))); + if (!measure.equals(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString())) { + measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); + } + } + + //Set for row key + RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; + int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.SEGMENT.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_SOURCE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_TARGET.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.FILTER_MASK.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_MATCH.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_SUCCESS.toString(), idx + 1); + idx++; + + RowKeyDesc rowKeyDesc = new RowKeyDesc(); + rowKeyDesc.setRowkeyColumns(rowKeyColDescs); + + //Set for aggregation group + String[] mandatory_dims = new String[] { QueryCubePropertyEnum.CUBE.toString() }; + mandatory_dims = refineColumnWithTable(tableName, mandatory_dims); + + String[][] hierarchy_dims = new String[1][]; + hierarchy_dims[0] = getTimeHierarchy(); + for (int i = 0; i < hierarchy_dims.length; i++) { + hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); + } + + String[][] joint_dims = new String[1][]; + joint_dims[0] = new String[] { QueryCubePropertyEnum.CUBOID_SOURCE.toString(), + QueryCubePropertyEnum.CUBOID_TARGET.toString() }; + for (int i = 0; i < joint_dims.length; i++) { + joint_dims[i] = refineColumnWithTable(tableName, joint_dims[i]); + } + + SelectRule selectRule = new SelectRule(); + selectRule.mandatoryDims = mandatory_dims; + selectRule.hierarchyDims = hierarchy_dims; + selectRule.jointDims = joint_dims; + + AggregationGroup aggGroup = new AggregationGroup(); + aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions)); + aggGroup.setSelectRule(selectRule); + + //Set for hbase mapping + HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); + hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); + + return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + } + + public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig config, SinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()); + + //Set for dimensions + List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryRPC(); + dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); + + List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size()); + for (String dimensionName : dimensions) { + dimensionDescList.add(getDimensionDesc(tableName, dimensionName)); + } + + //Set for measures + List<String> measures = ModelCreator.getMeasuresForMetricsQueryRPC(); + List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1); + + List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryRPC(); + Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size()); + for (Pair<String, String> entry : measureTypeList) { + measureTypeMap.put(entry.getKey(), entry.getValue()); + } + measureDescList.add(getMeasureCount()); + for (String measure : measures) { + measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure))); + measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); + } + measureDescList.add(getMeasurePercentile(QueryRPCPropertyEnum.CALL_TIME.toString())); + + //Set for row key + RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; + int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.PROJECT.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.REALIZATION.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.RPC_SERVER.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.EXCEPTION.toString(), idx + 1); + idx++; + + RowKeyDesc rowKeyDesc = new RowKeyDesc(); + rowKeyDesc.setRowkeyColumns(rowKeyColDescs); + + //Set for aggregation group + String[][] hierarchy_dims = new String[1][]; + hierarchy_dims[0] = getTimeHierarchy(); + for (int i = 0; i < hierarchy_dims.length; i++) { + hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); + } + + SelectRule selectRule = new SelectRule(); + selectRule.mandatoryDims = new String[0]; + selectRule.hierarchyDims = hierarchy_dims; + selectRule.jointDims = new String[0][0]; + + AggregationGroup aggGroup = new AggregationGroup(); + aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions)); + aggGroup.setSelectRule(selectRule); + + //Set for hbase mapping + HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); + hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); + + return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + } + + public static CubeDesc generateKylinCubeDescForMetricsJob(KylinConfig config, SinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob()); + + //Set for dimensions + List<String> dimensions = ModelCreator.getDimensionsForMetricsJob(); + dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString()); + + List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size()); + for (String dimensionName : dimensions) { + dimensionDescList.add(getDimensionDesc(tableName, dimensionName)); + } + + //Set for measures + List<String> measures = ModelCreator.getMeasuresForMetricsJob(); + List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize((measures.size() - 4) * 3 + 1 + 1 + 4); + + Set<String> stepDuration = Sets.newHashSet(); + stepDuration.add(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString()); + stepDuration.add(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString()); + stepDuration.add(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString()); + stepDuration.add(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString()); + + List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsJob(); + Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size()); + for (Pair<String, String> entry : measureTypeList) { + measureTypeMap.put(entry.getKey(), entry.getValue()); + } + measureDescList.add(getMeasureCount()); + for (String measure : measures) { + measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure))); + measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); + if (!stepDuration.contains(measure)) { + measureDescList.add(getMeasureMin(measure, measureTypeMap.get(measure))); + } + } + measureDescList.add(getMeasurePercentile(JobPropertyEnum.BUILD_DURATION.toString())); + + //Set for row key + RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; + int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.USER.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.PROJECT.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.CUBE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.ALGORITHM.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.TYPE.toString(), idx + 1); + idx++; + + RowKeyDesc rowKeyDesc = new RowKeyDesc(); + rowKeyDesc.setRowkeyColumns(rowKeyColDescs); + + //Set for aggregation group + String[][] hierarchy_dims = new String[1][]; + hierarchy_dims[0] = getTimeHierarchy(); + for (int i = 0; i < hierarchy_dims.length; i++) { + hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); + } + + SelectRule selectRule = new SelectRule(); + selectRule.mandatoryDims = new String[0]; + selectRule.hierarchyDims = hierarchy_dims; + selectRule.jointDims = new String[0][0]; + + AggregationGroup aggGroup = new AggregationGroup(); + aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions)); + aggGroup.setSelectRule(selectRule); + + //Set for hbase mapping + HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); + hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); + + return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + } + + public static CubeDesc generateKylinCubeDescForMetricsJobException(KylinConfig config, SinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException()); + + //Set for dimensions + List<String> dimensions = ModelCreator.getDimensionsForMetricsJobException(); + dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); + dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString()); + + List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size()); + for (String dimensionName : dimensions) { + dimensionDescList.add(getDimensionDesc(tableName, dimensionName)); + } + + //Set for measures + List<String> measures = ModelCreator.getMeasuresForMetricsJobException(); + measures.remove(JobPropertyEnum.ID_CODE.toString()); + List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(1); + + measureDescList.add(getMeasureCount()); + + //Set for row key + RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; + int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.USER.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.PROJECT.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.CUBE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.ALGORITHM.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.TYPE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.EXCEPTION.toString(), idx + 1); + idx++; + + RowKeyDesc rowKeyDesc = new RowKeyDesc(); + rowKeyDesc.setRowkeyColumns(rowKeyColDescs); + + //Set for aggregation group + String[][] hierarchy_dims = new String[1][]; + hierarchy_dims[0] = getTimeHierarchy(); + for (int i = 0; i < hierarchy_dims.length; i++) { + hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); + } + + SelectRule selectRule = new SelectRule(); + selectRule.mandatoryDims = new String[0]; + selectRule.hierarchyDims = hierarchy_dims; + selectRule.jointDims = new String[0][0]; + + AggregationGroup aggGroup = new AggregationGroup(); + aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions)); + aggGroup.setSelectRule(selectRule); + + //Set for hbase mapping + HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); + hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); + + return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + } + + public static CubeDesc generateKylinCubeDesc(String tableName, int storageType, + List<DimensionDesc> dimensionDescList, List<MeasureDesc> measureDescList, RowKeyDesc rowKeyDesc, + AggregationGroup aggGroup, HBaseMappingDesc hBaseMapping, Map<String, String> overrideProperties) { + CubeDesc desc = new CubeDesc(); + desc.setName(tableName.replace('.', '_')); + desc.setModelName(tableName.replace('.', '_')); + desc.setDescription(""); + desc.setLastModified(0L); + desc.setDimensions(dimensionDescList); + desc.setMeasures(measureDescList); + desc.setRowkey(rowKeyDesc); + desc.setHbaseMapping(hBaseMapping); + desc.setNotifyList(Lists.<String> newArrayList()); + desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString())); + desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L }); + desc.setEngineType(IEngineAware.ID_MR_V2); + desc.setStorageType(storageType); + desc.setAggregationGroups(Lists.newArrayList(aggGroup)); + desc.getOverrideKylinProps().putAll(overrideProperties); + desc.setSignature(desc.calculateSignature()); + desc.updateRandomUuid(); + return desc; + } + + public static HBaseColumnFamilyDesc[] getHBaseColumnFamily(List<MeasureDesc> measureDescList) { + List<String> normalMeasureList = Lists.newLinkedList(); + List<String> largeMeasureList = Lists.newLinkedList(); + for (MeasureDesc measureDesc : measureDescList) { + if (measureDesc.getFunction().isCountDistinct() + || measureDesc.getFunction().getExpression().equals(PercentileMeasureType.FUNC_PERCENTILE)) { + largeMeasureList.add(measureDesc.getName()); + } else { + normalMeasureList.add(measureDesc.getName()); + } + } + List<HBaseColumnFamilyDesc> columnFamilyDescList = Lists.newLinkedList(); + int idx = 1; + if (normalMeasureList.size() > 0) { + HBaseColumnDesc columnDesc = new HBaseColumnDesc(); + columnDesc.setQualifier("M"); + columnDesc.setMeasureRefs(normalMeasureList.toArray(new String[normalMeasureList.size()])); + HBaseColumnFamilyDesc columnFamilyDesc = new HBaseColumnFamilyDesc(); + columnFamilyDesc.setName("F" + idx++); + columnFamilyDesc.setColumns(new HBaseColumnDesc[] { columnDesc }); + + columnFamilyDescList.add(columnFamilyDesc); + } + for (String largeMeasure : largeMeasureList) { + HBaseColumnDesc columnDesc = new HBaseColumnDesc(); + columnDesc.setQualifier("M"); + columnDesc.setMeasureRefs(new String[] { largeMeasure }); + HBaseColumnFamilyDesc columnFamilyDesc = new HBaseColumnFamilyDesc(); + columnFamilyDesc.setName("F" + idx++); + columnFamilyDesc.setColumns(new HBaseColumnDesc[] { columnDesc }); + + columnFamilyDescList.add(columnFamilyDesc); + } + + return columnFamilyDescList.toArray(new HBaseColumnFamilyDesc[columnFamilyDescList.size()]); + } + + public static String[] getTimeHierarchy() { + String[] result = new String[4]; + result[0] = TimePropertyEnum.YEAR.toString(); + result[1] = TimePropertyEnum.MONTH.toString(); + result[2] = TimePropertyEnum.WEEK_BEGIN_DATE.toString(); + result[3] = TimePropertyEnum.DAY_DATE.toString(); + return result; + } + + public static String[] refineColumnWithTable(String tableName, List<String> columns) { + String[] dimensions = new String[columns.size()]; + for (int i = 0; i < dimensions.length; i++) { + dimensions[i] = tableName.substring(tableName.lastIndexOf(".") + 1) + "." + columns.get(i); + } + return dimensions; + } + + public static String[] refineColumnWithTable(String tableName, String[] columns) { + String[] dimensions = new String[columns.length]; + for (int i = 0; i < dimensions.length; i++) { + dimensions[i] = tableName.substring(tableName.lastIndexOf(".") + 1) + "." + columns[i]; + } + return dimensions; + } + + public static int getTimeRowKeyColDesc(String tableName, RowKeyColDesc[] rowKeyColDescs) { + int idx = 0; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.DAY_DATE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.WEEK_BEGIN_DATE.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.MONTH.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.YEAR.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.TIME_HOUR.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.TIME_MINUTE.toString(), idx + 1); + idx++; + return idx; + } + + public static RowKeyColDesc getRowKeyColDesc(String tableName, String column, int id) { + RowKeyColDesc rowKeyColDesc = new RowKeyColDesc(); + rowKeyColDesc.setIndex(Integer.toString(id)); + rowKeyColDesc.setColumn(tableName.substring(tableName.lastIndexOf(".") + 1) + "." + column); + rowKeyColDesc.setEncoding(DictionaryDimEnc.ENCODING_NAME); + rowKeyColDesc.setShardBy(false); + return rowKeyColDesc; + } + + public static DimensionDesc getDimensionDesc(String tableName, String dimension) { + DimensionDesc dimensionDesc = new DimensionDesc(); + dimensionDesc.setName(dimension); + dimensionDesc.setTable(tableName.substring(tableName.lastIndexOf(".") + 1)); + dimensionDesc.setColumn(dimension); + return dimensionDesc; + } + + public static MeasureDesc getMeasureCount() { + ParameterDesc parameterDesc = new ParameterDesc(); + parameterDesc.setValue("1"); + parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_CONSTANT); + + FunctionDesc function = new FunctionDesc(); + function.setExpression(FunctionDesc.FUNC_COUNT); + function.setParameter(parameterDesc); + function.setReturnType(HiveTableCreator.HiveTypeEnum.HBIGINT.toString()); + + MeasureDesc result = new MeasureDesc(); + result.setName("_COUNT_"); + result.setFunction(function); + return result; + } + + public static MeasureDesc getMeasureSum(String column, String dataType) { + ParameterDesc parameterDesc = new ParameterDesc(); + parameterDesc.setValue(column); + parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); + + FunctionDesc function = new FunctionDesc(); + function.setExpression(FunctionDesc.FUNC_SUM); + function.setParameter(parameterDesc); + function.setReturnType(dataType.equals(HiveTableCreator.HiveTypeEnum.HDOUBLE.toString()) + ? HiveTableCreator.HiveTypeEnum.HDECIMAL.toString() + : dataType); + + MeasureDesc result = new MeasureDesc(); + result.setName(column + "_SUM"); + result.setFunction(function); + return result; + } + + public static MeasureDesc getMeasureMax(String column, String dataType) { + ParameterDesc parameterDesc = new ParameterDesc(); + parameterDesc.setValue(column); + parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); + + FunctionDesc function = new FunctionDesc(); + function.setExpression(FunctionDesc.FUNC_MAX); + function.setParameter(parameterDesc); + function.setReturnType(dataType); + + MeasureDesc result = new MeasureDesc(); + result.setName(column + "_MAX"); + result.setFunction(function); + return result; + } + + public static MeasureDesc getMeasureMin(String column, String dataType) { + ParameterDesc parameterDesc = new ParameterDesc(); + parameterDesc.setValue(column); + parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); + + FunctionDesc function = new FunctionDesc(); + function.setExpression(FunctionDesc.FUNC_MIN); + function.setParameter(parameterDesc); + function.setReturnType(dataType); + + MeasureDesc result = new MeasureDesc(); + result.setName(column + "_MIN"); + result.setFunction(function); + return result; + } + + public static MeasureDesc getMeasureHLL(String column) { + ParameterDesc parameterDesc = new ParameterDesc(); + parameterDesc.setValue(column); + parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); + + FunctionDesc function = new FunctionDesc(); + function.setExpression(FunctionDesc.FUNC_COUNT_DISTINCT); + function.setParameter(parameterDesc); + function.setReturnType("hllc12"); + + MeasureDesc result = new MeasureDesc(); + result.setName(column + "_HLL"); + result.setFunction(function); + return result; + } + + public static MeasureDesc getMeasurePercentile(String column) { + ParameterDesc parameterDesc = new ParameterDesc(); + parameterDesc.setValue(column); + parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); + + FunctionDesc function = new FunctionDesc(); + function.setExpression(PercentileMeasureType.FUNC_PERCENTILE); + function.setParameter(parameterDesc); + function.setReturnType("percentile(100)"); + + MeasureDesc result = new MeasureDesc(); + result.setName(column + "_PERCENTILE"); + result.setFunction(function); + return result; + } + }