Repository: kylin Updated Branches: refs/heads/yaho-cube-planner 0c4b3ad57 -> 0b12df776
APACHE-KYLIN-2734: backend support for hot cuboids export & import Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7279427d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7279427d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7279427d Branch: refs/heads/yaho-cube-planner Commit: 7279427d40df7e9d6151ecf3f439c2c20f800dd1 Parents: 0c4b3ad Author: Zhong <nju_y...@apache.org> Authored: Wed Aug 30 14:51:19 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Wed Aug 30 14:51:19 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/model/CubeDesc.java | 38 ++++++++ .../kylin/rest/controller/CubeController.java | 99 ++++++++++++++++++++ .../apache/kylin/rest/service/BasicService.java | 4 + .../apache/kylin/rest/service/CubeService.java | 27 ++++++ .../apache/kylin/rest/service/QueryService.java | 15 ++- 5 files changed, 180 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7279427d/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 510a7d5..7eefcd6 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -168,6 +168,9 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { @JsonProperty("override_kylin_properties") private LinkedHashMap<String, String> overrideKylinProps = new LinkedHashMap<String, String>(); + @JsonProperty("mandatory_dimension_set_list") + private List<Set<String>> mandatoryDimensionSetList = Collections.emptyList(); + private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<>(); private LinkedHashSet<ColumnDesc> allColumnDescs = new LinkedHashSet<>(); private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<>(); @@ -438,6 +441,14 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { this.overrideKylinProps = overrideKylinProps; } + public List<Set<String>> getMandatoryDimensionSetList() { + return mandatoryDimensionSetList; + } + + public void setMandatoryDimensionSetList(List<Set<String>> mandatoryDimensionSetList) { + this.mandatoryDimensionSetList = mandatoryDimensionSetList; + } + @Override public boolean equals(Object o) { if (this == o) @@ -526,6 +537,13 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { .append(JsonUtil.writeValueAsString(this.engineType)).append("|")// .append(JsonUtil.writeValueAsString(this.storageType)).append("|"); + if (mandatoryDimensionSetList != null && !mandatoryDimensionSetList.isEmpty()) { + for (Set<String> mandatoryDimensionSet : mandatoryDimensionSetList) { + TreeSet<String> sortedSet = Sets.newTreeSet(mandatoryDimensionSet); + sigString.append(JsonUtil.writeValueAsString(sortedSet)).append("|"); + } + } + String signatureInput = sigString.toString().replaceAll("\\s+", "").toLowerCase(); byte[] signature = md.digest(signatureInput.getBytes()); @@ -599,6 +617,26 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { initDictionaryDesc(); amendAllColumns(); + + // check if mandatory dimension set list is valid + validateMandatoryDimensionSetList(); + } + + public void validateMandatoryDimensionSetList() { + Set<String> rowKeyColumns = Sets.newHashSet(); + for (RowKeyColDesc entry : getRowkey().getRowKeyColumns()) { + rowKeyColumns.add(entry.getColumn()); + } + + for (Set<String> mandatoryDimensionSet : this.mandatoryDimensionSetList) { + for (String columnName : mandatoryDimensionSet) { + if (!rowKeyColumns.contains(columnName)) { + logger.info("Column " + columnName + " in " + mandatoryDimensionSet + " does not exist"); + throw new IllegalStateException( + "Column " + columnName + " in " + mandatoryDimensionSet + " does not exist"); + } + } + } } public CuboidScheduler getInitialCuboidScheduler() { http://git-wip-us.apache.org/repos/asf/kylin/blob/7279427d/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 8f821dd..c415212 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -21,13 +21,18 @@ package org.apache.kylin.rest.controller; import static org.apache.kylin.rest.service.CubeService.VALID_CUBENAME; import java.io.IOException; +import java.io.PrintWriter; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import javax.servlet.http.HttpServletResponse; + import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.cube.CubeInstance; @@ -35,6 +40,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.dimension.DimensionEncodingFactory; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.job.JobInstance; @@ -44,6 +50,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.metrics.MetricsManager; +import org.apache.kylin.metrics.property.QueryCubePropertyEnum; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.exception.InternalErrorException; @@ -52,11 +60,13 @@ import org.apache.kylin.rest.request.CubeRequest; import org.apache.kylin.rest.request.JobBuildRequest; import org.apache.kylin.rest.request.JobBuildRequest2; import org.apache.kylin.rest.request.JobOptimizeRequest; +import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.GeneralResponse; import org.apache.kylin.rest.response.HBaseResponse; import org.apache.kylin.rest.service.CubeService; import org.apache.kylin.rest.service.JobService; import org.apache.kylin.rest.service.ProjectService; +import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.source.kafka.util.KafkaClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +88,7 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * CubeController is defined as Restful API entrance for UI. @@ -99,6 +110,10 @@ public class CubeController extends BasicController { @Qualifier("projectService") private ProjectService projectService; + @Autowired + @Qualifier("queryService") + private QueryService queryService; + @RequestMapping(value = "", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "modelName", required = false) String modelName, @RequestParam(value = "projectName", required = false) String projectName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { @@ -707,6 +722,90 @@ public class CubeController extends BasicController { } + @RequestMapping(value = "/{cubeName}/cuboids/export", method = RequestMethod.GET) + @ResponseBody + public void cuboidsExport(@PathVariable String cubeName, @RequestParam(value = "top") Integer top, + HttpServletResponse response) throws IOException { + CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); + if (cube == null) { + logger.error("Get cube: [" + cubeName + "] failed when get recommend cuboids"); + throw new BadRequestException("Get cube: [" + cubeName + "] failed when get recommend cuboids"); + } + Map<Long, Long> cuboidList = getRecommendCuboidList(cube); + if (cuboidList == null || cuboidList.isEmpty()) { + logger.warn("Cannot get recommend cuboid list for cube " + cubeName); + } + if (cuboidList.size() < top) { + logger.info("Only recommend " + cuboidList.size() + " cuboids less than topn " + top); + } + Iterator<Long> cuboidIterator = cuboidList.keySet().iterator(); + RowKeyColDesc[] rowKeyColDescList = cube.getDescriptor().getRowkey().getRowKeyColumns(); + + List<Set<String>> dimensionSetList = Lists.newLinkedList(); + while (top-- > 0 && cuboidIterator.hasNext()) { + Set<String> dimensionSet = Sets.newHashSet(); + dimensionSetList.add(dimensionSet); + long cuboid = cuboidIterator.next(); + for (int i = 0; i < rowKeyColDescList.length; i++) { + if ((cuboid & (1L << rowKeyColDescList[i].getBitIndex())) > 0) { + dimensionSet.add(rowKeyColDescList[i].getColumn()); + } + } + } + + response.setContentType("text/json;charset=utf-8"); + response.setHeader("Content-Disposition", "attachment; filename=\"" + cubeName + ".json\""); + try (PrintWriter writer = response.getWriter()) { + writer.write(JsonUtil.writeValueAsString(dimensionSetList)); + } catch (IOException e) { + logger.error("", e); + throw new InternalErrorException("Failed to write: " + e.getLocalizedMessage()); + } + } + + private Map<Long, Long> getRecommendCuboidList(CubeInstance cube) throws IOException { + // Get cuboid source info + Map<Long, Long> optimizeHitFrequencyMap = getCuboidHitFrequency(cube.getName(), true); + Map<Long, Map<Long, Long>> rollingUpCountSourceMap = getCuboidRollingUpCount(cube.getName()); + return cubeService.getRecommendCuboidStatistics(cube, optimizeHitFrequencyMap, rollingUpCountSourceMap); + } + + private Map<Long, Long> getCuboidHitFrequency(String cubeName, boolean isCuboidSource) { + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT); + String cuboidColumn = QueryCubePropertyEnum.CUBOID_SOURCE.toString(); + if (!isCuboidSource) { + cuboidColumn = QueryCubePropertyEnum.CUBOID_TARGET.toString(); + } + String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(); + String table = cubeService.getMetricsManager() + .getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube()); + String sql = "select " + cuboidColumn + ", sum(" + hitMeasure + ") " // + + "from " + table// + + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" + cubeName + "' " // + + "group by " + cuboidColumn; + sqlRequest.setSql(sql); + List<List<String>> orgHitFrequency = queryService.queryWithoutSecure(sqlRequest).getResults(); + return cubeService.formatQueryCount(orgHitFrequency); + } + + private Map<Long, Map<Long, Long>> getCuboidRollingUpCount(String cubeName) { + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT); + String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString(); + String cuboidTarget = QueryCubePropertyEnum.CUBOID_TARGET.toString(); + String aggCount = QueryCubePropertyEnum.AGGR_COUNT.toString(); + String table = cubeService.getMetricsManager() + .getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube()); + String sql = "select " + cuboidSource + ", " + cuboidTarget + ", sum(" + aggCount + ")/count(*) " // + + "from " + table // + + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" + cubeName + "' " // + + "group by " + cuboidSource + ", " + cuboidTarget; + sqlRequest.setSql(sql); + List<List<String>> orgRollingUpCount = queryService.queryWithoutSecure(sqlRequest).getResults(); + return cubeService.formatRollingUpCount(orgRollingUpCount); + } + /** * Initiate the very beginning of a streaming cube. Will seek the latest offests of each partition from streaming * source (kafka) and record in the cube descriptor; In the first build job, it will use these offests as the start point. http://git-wip-us.apache.org/repos/asf/kylin/blob/7279427d/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java index 66e7cfb..0c1054c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -29,6 +29,7 @@ import org.apache.kylin.metadata.badquery.BadQueryHistoryManager; import org.apache.kylin.metadata.draft.DraftManager; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.streaming.StreamingManager; +import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.storage.hybrid.HybridManager; @@ -84,4 +85,7 @@ public abstract class BasicService { return DraftManager.getInstance(getConfig()); } + public MetricsManager getMetricsManager() { + return MetricsManager.getInstance(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7279427d/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 93af53b..fdf7bec 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.WeakHashMap; import org.apache.commons.lang.StringUtils; @@ -38,6 +39,7 @@ import org.apache.kylin.cube.cuboid.CuboidCLI; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; @@ -746,4 +748,29 @@ public class CubeService extends BasicService { return filtered; } + + /** cube planner services */ + public Map<Long, Long> formatQueryCount(List<List<String>> orgQueryCount) { + Map<Long, Long> formattedQueryCount = Maps.newLinkedHashMap(); + for (List<String> hit : orgQueryCount) { + formattedQueryCount.put(Long.parseLong(hit.get(0)), (long) Double.parseDouble(hit.get(1))); + } + return formattedQueryCount; + } + + public Map<Long, Map<Long, Long>> formatRollingUpCount(List<List<String>> orgRollingUpCount) { + Map<Long, Map<Long, Long>> formattedRollingUpCount = Maps.newLinkedHashMap(); + for (List<String> rollingUp : orgRollingUpCount) { + Map<Long, Long> childMap = Maps.newLinkedHashMap(); + childMap.put(Long.parseLong(rollingUp.get(1)), (long) Double.parseDouble(rollingUp.get(2))); + formattedRollingUpCount.put(Long.parseLong(rollingUp.get(0)), childMap); + } + return formattedRollingUpCount; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION')") + public Map<Long, Long> getRecommendCuboidStatistics(CubeInstance cube, Map<Long, Long> hitFrequencyMap, + Map<Long, Map<Long, Long>> rollingUpCountSourceMap) throws IOException { + return CuboidRecommenderUtil.getRecommendCuboidList(cube, hitFrequencyMap, rollingUpCountSourceMap); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7279427d/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 94b6196..8c767aa 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -342,7 +342,15 @@ public class QueryService extends BasicService { } } + public SQLResponse queryWithoutSecure(SQLRequest sqlRequest) { + return doQueryWithCache(sqlRequest, false); + } + public SQLResponse doQueryWithCache(SQLRequest sqlRequest) { + return doQueryWithCache(sqlRequest, true); + } + + public SQLResponse doQueryWithCache(SQLRequest sqlRequest, boolean secureEnabled) { Message msg = MsgPicker.getMsg(); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); @@ -415,7 +423,7 @@ public class QueryService extends BasicService { sqlResponse.setTotalScanBytes(0); } - checkQueryAuth(sqlResponse, project); + checkQueryAuth(sqlResponse, project, secureEnabled); } catch (Throwable e) { // calcite may throw AssertError logger.error("Exception when execute sql", e); @@ -468,8 +476,9 @@ public class QueryService extends BasicService { return response; } - protected void checkQueryAuth(SQLResponse sqlResponse, String project) throws AccessDeniedException { - if (!sqlResponse.getIsException() && KylinConfig.getInstanceFromEnv().isQuerySecureEnabled()) { + protected void checkQueryAuth(SQLResponse sqlResponse, String project, boolean secureEnabled) + throws AccessDeniedException { + if (!sqlResponse.getIsException() && KylinConfig.getInstanceFromEnv().isQuerySecureEnabled() && secureEnabled) { checkAuthorization(sqlResponse, project); } }