http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java new file mode 100644 index 0000000..7197f03 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.invertedindex.IIDescManager; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.badquery.BadQueryHistoryManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.storage.hybrid.HybridManager; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; + +public abstract class BasicService { + + public KylinConfig getConfig() { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + + if (kylinConfig == null) { + throw new IllegalArgumentException("Failed to load kylin config instance"); + } + + return kylinConfig; + } + + public MetadataManager getMetadataManager() { + return MetadataManager.getInstance(getConfig()); + } + + public CubeManager getCubeManager() { + return CubeManager.getInstance(getConfig()); + } + + public StreamingManager getStreamingManager() { + return StreamingManager.getInstance(getConfig()); + } + + public KafkaConfigManager getKafkaManager() throws IOException { + return KafkaConfigManager.getInstance(getConfig()); + } + + public CubeDescManager getCubeDescManager() { + return CubeDescManager.getInstance(getConfig()); + } + + public ProjectManager getProjectManager() { + return ProjectManager.getInstance(getConfig()); + } + + public HybridManager getHybridManager() { + return HybridManager.getInstance(getConfig()); + } + + public ExecutableManager getExecutableManager() { + return ExecutableManager.getInstance(getConfig()); + } + + public IIDescManager getIIDescManager() { + return IIDescManager.getInstance(getConfig()); + } + + public IIManager getIIManager() { + return IIManager.getInstance(getConfig()); + } + + public BadQueryHistoryManager getBadQueryHistoryManager() { + return BadQueryHistoryManager.getInstance(getConfig()); + } + + protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, final Map<String, Output> allOutputs) { + return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, allOutputs); + } + + protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs) { + List<CubingJob> results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)).filter(new Predicate<AbstractExecutable>() { + @Override + public boolean apply(AbstractExecutable executable) { + if (executable instanceof CubingJob) { + if (cubeName == null) { + return true; + } + return CubingExecutableUtil.getCubeName(executable.getParams()).equalsIgnoreCase(cubeName); + } else { + return false; + } + } + }).transform(new Function<AbstractExecutable, CubingJob>() { + @Override + public CubingJob apply(AbstractExecutable executable) { + return (CubingJob) executable; + } + }).filter(Predicates.and(new Predicate<CubingJob>() { + @Override + public boolean apply(CubingJob executable) { + if (null == projectName || null == getProjectManager().getProject(projectName)) { + return true; + } else { + ProjectInstance project = getProjectManager().getProject(projectName); + return project.containsRealization(RealizationType.CUBE, CubingExecutableUtil.getCubeName(executable.getParams())); + } + } + }, new Predicate<CubingJob>() { + @Override + public boolean apply(CubingJob executable) { + return statusList.contains(allOutputs.get(executable.getId()).getState()); + } + }))); + return results; + } + + protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList) { + return listAllCubingJobs(cubeName, projectName, statusList, getExecutableManager().getAllOutputs()); + } + + protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName) { + return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs()); + } + +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java new file mode 100644 index 0000000..9185544 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.PostConstruct; +import javax.sql.DataSource; + +import org.apache.calcite.jdbc.Driver; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.restclient.Broadcaster; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.invertedindex.IIDescManager; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.realization.RealizationRegistry; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.query.enumerator.OLAPQuery; +import org.apache.kylin.query.schema.OLAPSchemaFactory; +import org.apache.kylin.rest.controller.QueryController; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hybrid.HybridManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.stereotype.Component; + +import net.sf.ehcache.CacheManager; + +/** + */ +@Component("cacheService") +public class CacheService extends BasicService { + + private static final Logger logger = LoggerFactory.getLogger(CacheService.class); + + private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>(); + + @Autowired + private CubeService cubeService; + + @Autowired + private CacheManager cacheManager; + + @PostConstruct + public void initCubeChangeListener() throws IOException { + CubeManager cubeMgr = CubeManager.getInstance(getConfig()); + cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() { + + @Override + public void afterCubeCreate(CubeInstance cube) { + // no cache need change + } + + @Override + public void afterCubeUpdate(CubeInstance cube) { + rebuildCubeCache(cube.getName()); + } + + @Override + public void afterCubeDelete(CubeInstance cube) { + removeCubeCache(cube.getName(), cube); + } + }); + } + + // for test + public void setCubeService(CubeService cubeService) { + this.cubeService = cubeService; + } + + protected void cleanDataCache(String storageUUID) { + if (cacheManager != null) { + logger.info("cleaning cache for " + storageUUID + " (currently remove all entries)"); + cacheManager.getCache(QueryController.SUCCESS_QUERY_CACHE).removeAll(); + cacheManager.getCache(QueryController.EXCEPTION_QUERY_CACHE).removeAll(); + } else { + logger.warn("skip cleaning cache for " + storageUUID); + } + } + + protected void cleanAllDataCache() { + if (cacheManager != null) { + logger.warn("cleaning all storage cache"); + cacheManager.clearAll(); + } else { + logger.warn("skip cleaning all storage cache"); + } + } + + private static void removeOLAPDataSource(String project) { + logger.info("removeOLAPDataSource is called for project " + project); + if (StringUtils.isEmpty(project)) + throw new IllegalArgumentException("removeOLAPDataSource: project name not given"); + + project = ProjectInstance.getNormalizedProjectName(project); + olapDataSources.remove(project); + } + + public static void removeAllOLAPDataSources() { + // brutal, yet simplest way + logger.info("removeAllOLAPDataSources is called."); + olapDataSources.clear(); + } + + public DataSource getOLAPDataSource(String project) { + + project = ProjectInstance.getNormalizedProjectName(project); + + DataSource ret = olapDataSources.get(project); + if (ret == null) { + logger.debug("Creating a new data source, OLAP data source pointing to " + getConfig()); + File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig()); + + try { + String text = FileUtils.readFileToString(modelJson); + logger.debug("The new temp olap json is :" + text); + } catch (IOException e) { + e.printStackTrace(); // logging failure is not critical + } + + DriverManagerDataSource ds = new DriverManagerDataSource(); + Properties props = new Properties(); + props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold())); + ds.setConnectionProperties(props); + ds.setDriverClassName(Driver.class.getName()); + ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath()); + + ret = olapDataSources.putIfAbsent(project, ds); + if (ret == null) { + ret = ds; + } + } + return ret; + } + + public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) { + final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey; + logger.info(log); + try { + switch (cacheType) { + case CUBE: + rebuildCubeCache(cacheKey); + break; + case STREAMING: + getStreamingManager().reloadStreamingConfigLocal(cacheKey); + break; + case KAFKA: + getKafkaManager().reloadKafkaConfigLocal(cacheKey); + break; + case CUBE_DESC: + getCubeDescManager().reloadCubeDescLocal(cacheKey); + break; + case PROJECT: + reloadProjectCache(cacheKey); + break; + case INVERTED_INDEX: + //II update does not need to update storage cache because it is dynamic already + getIIManager().reloadIILocal(cacheKey); + getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey); + getProjectManager().clearL2Cache(); + break; + case INVERTED_INDEX_DESC: + getIIDescManager().reloadIIDescLocal(cacheKey); + break; + case TABLE: + getMetadataManager().reloadTableCache(cacheKey); + IIDescManager.clearCache(); + CubeDescManager.clearCache(); + break; + case EXTERNAL_FILTER: + getMetadataManager().reloadExtFilter(cacheKey); + IIDescManager.clearCache(); + CubeDescManager.clearCache(); + break; + case DATA_MODEL: + getMetadataManager().reloadDataModelDesc(cacheKey); + IIDescManager.clearCache(); + CubeDescManager.clearCache(); + break; + case ALL: + DictionaryManager.clearCache(); + MetadataManager.clearCache(); + CubeDescManager.clearCache(); + CubeManager.clearCache(); + IIDescManager.clearCache(); + IIManager.clearCache(); + HybridManager.clearCache(); + RealizationRegistry.clearCache(); + ProjectManager.clearCache(); + KafkaConfigManager.clearCache(); + StreamingManager.clearCache(); + HBaseConnection.clearConnCache(); + + cleanAllDataCache(); + removeAllOLAPDataSources(); + break; + default: + throw new RuntimeException("invalid cacheType:" + cacheType); + } + } catch (IOException e) { + throw new RuntimeException("error " + log, e); + } + } + + private void rebuildCubeCache(String cubeName) { + CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName); + getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName); + reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, cubeName)); + //clean query related cache first + if (cube != null) { + cleanDataCache(cube.getUuid()); + } + cubeService.updateOnNewSegmentReady(cubeName); + } + + public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) { + final String log = "remove cache type: " + cacheType + " name:" + cacheKey; + try { + switch (cacheType) { + case CUBE: + removeCubeCache(cacheKey, null); + break; + case CUBE_DESC: + getCubeDescManager().removeLocalCubeDesc(cacheKey); + break; + case PROJECT: + ProjectManager.clearCache(); + break; + case INVERTED_INDEX: + getIIManager().removeIILocal(cacheKey); + break; + case INVERTED_INDEX_DESC: + getIIDescManager().removeIIDescLocal(cacheKey); + break; + case TABLE: + throw new UnsupportedOperationException(log); + case EXTERNAL_FILTER: + throw new UnsupportedOperationException(log); + case DATA_MODEL: + getMetadataManager().removeModelCache(cacheKey); + break; + default: + throw new RuntimeException("invalid cacheType:" + cacheType); + } + } catch (IOException e) { + throw new RuntimeException("error " + log, e); + } + } + + private void removeCubeCache(String cubeName, CubeInstance cube) { + // you may not get the cube instance if it's already removed from metadata + if (cube == null) { + cube = getCubeManager().getCube(cubeName); + } + + getCubeManager().removeCubeLocal(cubeName); + getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName); + reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, cubeName)); + + if (cube != null) { + cleanDataCache(cube.getUuid()); + } + } + + private void reloadProjectCache(List<ProjectInstance> projects) { + for (ProjectInstance prj : projects) { + reloadProjectCache(prj.getName()); + } + } + + private void reloadProjectCache(String projectName) { + try { + getProjectManager().reloadProjectLocal(projectName); + } catch (IOException ex) { + logger.warn("Failed to reset project cache", ex); + } + removeOLAPDataSource(projectName); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java new file mode 100644 index 0000000..a9d4bfc --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.cuboid.CuboidCLI; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.job.exception.JobException; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.project.RealizationEntry; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.request.MetricsRequest; +import org.apache.kylin.rest.response.HBaseResponse; +import org.apache.kylin.rest.response.MetricsResponse; +import org.apache.kylin.rest.security.AclPermission; +import org.apache.kylin.source.hive.HiveSourceTableLoader; +import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; +import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PostFilter; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Component; + +import com.google.common.collect.Lists; + +/** + * Stateless & lightweight service facade of cube management functions. + * + * @author yangli9 + */ +@Component("cubeMgmtService") +public class CubeService extends BasicService { + private static final String DESC_SUFFIX = "_desc"; + + private static final Logger logger = LoggerFactory.getLogger(CubeService.class); + + private WeakHashMap<String, HBaseResponse> htableInfoCache = new WeakHashMap<>(); + + @Autowired + private AccessService accessService; + + @PostFilter(Constant.ACCESS_POST_FILTER_READ) + public List<CubeInstance> listAllCubes(final String cubeName, final String projectName, final String modelName) { + List<CubeInstance> cubeInstances = null; + ProjectInstance project = (null != projectName) ? getProjectManager().getProject(projectName) : null; + + if (null == project) { + cubeInstances = getCubeManager().listAllCubes(); + } else { + cubeInstances = listAllCubes(projectName); + } + + List<CubeInstance> filterModelCubes = new ArrayList<CubeInstance>(); + + if (modelName != null) { + for (CubeInstance cubeInstance : cubeInstances) { + boolean isCubeMatch = cubeInstance.getDescriptor().getModelName().toLowerCase().equals(modelName.toLowerCase()); + if (isCubeMatch) { + filterModelCubes.add(cubeInstance); + } + } + } else { + filterModelCubes = cubeInstances; + } + + List<CubeInstance> filterCubes = new ArrayList<CubeInstance>(); + for (CubeInstance cubeInstance : filterModelCubes) { + boolean isCubeMatch = (null == cubeName) || cubeInstance.getName().toLowerCase().contains(cubeName.toLowerCase()); + + if (isCubeMatch) { + filterCubes.add(cubeInstance); + } + } + + return filterCubes; + } + + public List<CubeInstance> getCubes(final String cubeName, final String projectName, final String modelName, final Integer limit, final Integer offset) { + + List<CubeInstance> cubes; + cubes = listAllCubes(cubeName, projectName, modelName); + + int climit = (null == limit) ? cubes.size() : limit; + int coffset = (null == offset) ? 0 : offset; + + if (cubes.size() <= coffset) { + return Collections.emptyList(); + } + + if ((cubes.size() - coffset) < climit) { + return cubes.subList(coffset, cubes.size()); + } + + return cubes.subList(coffset, coffset + climit); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public CubeInstance updateCubeCost(String cubeName, int cost) throws IOException { + CubeInstance cube = getCubeManager().getCube(cubeName); + if (cube == null) { + throw new IOException("Cannot find cube " + cubeName); + } + if (cube.getCost() == cost) { + // Do nothing + return cube; + } + cube.setCost(cost); + + String owner = SecurityContextHolder.getContext().getAuthentication().getName(); + cube.setOwner(owner); + + CubeUpdate cubeBuilder = new CubeUpdate(cube).setOwner(owner).setCost(cost); + + return getCubeManager().updateCube(cubeBuilder); + } + + public CubeInstance createCubeAndDesc(String cubeName, String projectName, CubeDesc desc) throws IOException { + if (getCubeManager().getCube(cubeName) != null) { + throw new InternalErrorException("The cube named " + cubeName + " already exists"); + } + + if (getCubeDescManager().getCubeDesc(desc.getName()) != null) { + throw new InternalErrorException("The cube desc named " + desc.getName() + " already exists"); + } + + String owner = SecurityContextHolder.getContext().getAuthentication().getName(); + CubeDesc createdDesc; + CubeInstance createdCube; + + createdDesc = getCubeDescManager().createCubeDesc(desc); + + if (!createdDesc.getError().isEmpty()) { + getCubeDescManager().removeCubeDesc(createdDesc); + throw new InternalErrorException(createdDesc.getError().get(0)); + } + + try { + int cuboidCount = CuboidCLI.simulateCuboidGeneration(createdDesc, false); + logger.info("New cube " + cubeName + " has " + cuboidCount + " cuboids"); + } catch (Exception e) { + getCubeDescManager().removeCubeDesc(createdDesc); + throw new InternalErrorException("Failed to deal with the request.", e); + } + + createdCube = getCubeManager().createCube(cubeName, projectName, createdDesc, owner); + accessService.init(createdCube, AclPermission.ADMINISTRATION); + + ProjectInstance project = getProjectManager().getProject(projectName); + accessService.inherit(createdCube, project); + + return createdCube; + } + + public List<CubeInstance> listAllCubes(String projectName) { + ProjectManager projectManager = getProjectManager(); + ProjectInstance project = projectManager.getProject(projectName); + if (project == null) { + return Collections.emptyList(); + } + ArrayList<CubeInstance> result = new ArrayList<CubeInstance>(); + for (RealizationEntry projectDataModel : project.getRealizationEntries()) { + if (projectDataModel.getType() == RealizationType.CUBE) { + CubeInstance cube = getCubeManager().getCube(projectDataModel.getRealization()); + if (cube != null) + result.add(cube); + else + logger.error("Cube instance " + projectDataModel.getRealization() + " is failed to load"); + } + } + return result; + } + + private boolean isCubeInProject(String projectName, CubeInstance target) { + ProjectManager projectManager = getProjectManager(); + ProjectInstance project = projectManager.getProject(projectName); + if (project == null) { + return false; + } + for (RealizationEntry projectDataModel : project.getRealizationEntries()) { + if (projectDataModel.getType() == RealizationType.CUBE) { + CubeInstance cube = getCubeManager().getCube(projectDataModel.getRealization()); + if (cube == null) { + logger.error("Project " + projectName + " contains realization " + projectDataModel.getRealization() + " which is not found by CubeManager"); + continue; + } + if (cube.equals(target)) { + return true; + } + } + } + return false; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')") + public CubeDesc updateCubeAndDesc(CubeInstance cube, CubeDesc desc, String newProjectName, boolean forceUpdate) throws IOException, JobException { + + final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); + if (!cubingJobs.isEmpty()) { + throw new JobException("Cube schema shouldn't be changed with running job."); + } + + try { + //double check again + if (!forceUpdate && !cube.getDescriptor().consistentWith(desc)) { + throw new IllegalStateException("cube's desc is not consistent with the new desc"); + } + + CubeDesc updatedCubeDesc = getCubeDescManager().updateCubeDesc(desc); + int cuboidCount = CuboidCLI.simulateCuboidGeneration(updatedCubeDesc, false); + logger.info("Updated cube " + cube.getName() + " has " + cuboidCount + " cuboids"); + + ProjectManager projectManager = getProjectManager(); + if (!isCubeInProject(newProjectName, cube)) { + String owner = SecurityContextHolder.getContext().getAuthentication().getName(); + ProjectInstance newProject = projectManager.moveRealizationToProject(RealizationType.CUBE, cube.getName(), newProjectName, owner); + accessService.inherit(cube, newProject); + } + + return updatedCubeDesc; + } catch (IOException e) { + throw new InternalErrorException("Failed to deal with the request.", e); + } + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')") + public void deleteCube(CubeInstance cube) throws IOException, JobException { + final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); + if (!cubingJobs.isEmpty()) { + throw new JobException("The cube " + cube.getName() + " has running job, please discard it and try again."); + } + + this.releaseAllSegments(cube); + getCubeManager().dropCube(cube.getName(), true); + accessService.clean(cube, true); + } + + public boolean isCubeDescFreeEditable(CubeDesc cd) { + List<CubeInstance> cubes = getCubeManager().getCubesByDesc(cd.getName()); + for (CubeInstance cube : cubes) { + if (cube.getSegments().size() != 0) { + logger.debug("cube '" + cube.getName() + " has " + cube.getSegments().size() + " segments, couldn't edit cube desc."); + return false; + } + } + return true; + } + + public static String getCubeDescNameFromCube(String cubeName) { + return cubeName + DESC_SUFFIX; + } + + public static String getCubeNameFromDesc(String descName) { + if (descName.toLowerCase().endsWith(DESC_SUFFIX)) { + return descName.substring(0, descName.toLowerCase().indexOf(DESC_SUFFIX)); + } else { + return descName; + } + } + + /** + * Stop all jobs belonging to this cube and clean out all segments + * + * @param cube + * @return + * @throws IOException + * @throws JobException + */ + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public CubeInstance purgeCube(CubeInstance cube) throws IOException, JobException { + + String cubeName = cube.getName(); + RealizationStatusEnum ostatus = cube.getStatus(); + if (null != ostatus && !RealizationStatusEnum.DISABLED.equals(ostatus)) { + throw new InternalErrorException("Only disabled cube can be purged, status of " + cubeName + " is " + ostatus); + } + + try { + this.releaseAllSegments(cube); + return cube; + } catch (IOException e) { + throw e; + } + + } + + /** + * Update a cube status from ready to disabled. + * + * @return + * @throws IOException + * @throws JobException + */ + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public CubeInstance disableCube(CubeInstance cube) throws IOException, JobException { + + String cubeName = cube.getName(); + + RealizationStatusEnum ostatus = cube.getStatus(); + if (null != ostatus && !RealizationStatusEnum.READY.equals(ostatus)) { + throw new InternalErrorException("Only ready cube can be disabled, status of " + cubeName + " is " + ostatus); + } + + cube.setStatus(RealizationStatusEnum.DISABLED); + + try { + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setStatus(RealizationStatusEnum.DISABLED); + return getCubeManager().updateCube(cubeBuilder); + } catch (IOException e) { + cube.setStatus(ostatus); + throw e; + } + } + + /** + * Update a cube status from disable to ready. + * + * @return + * @throws IOException + * @throws JobException + */ + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public CubeInstance enableCube(CubeInstance cube) throws IOException, JobException { + String cubeName = cube.getName(); + + RealizationStatusEnum ostatus = cube.getStatus(); + if (!cube.getStatus().equals(RealizationStatusEnum.DISABLED)) { + throw new InternalErrorException("Only disabled cube can be enabled, status of " + cubeName + " is " + ostatus); + } + + if (cube.getSegments(SegmentStatusEnum.READY).size() == 0) { + throw new InternalErrorException("Cube " + cubeName + " dosen't contain any READY segment"); + } + + final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); + if (!cubingJobs.isEmpty()) { + throw new JobException("Enable is not allowed with a running job."); + } + if (!cube.getDescriptor().checkSignature()) { + throw new IllegalStateException("Inconsistent cube desc signature for " + cube.getDescriptor()); + } + + try { + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setStatus(RealizationStatusEnum.READY); + return getCubeManager().updateCube(cubeBuilder); + } catch (IOException e) { + cube.setStatus(ostatus); + throw e; + } + } + + public MetricsResponse calculateMetrics(MetricsRequest request) { + List<CubeInstance> cubes = this.getCubeManager().listAllCubes(); + MetricsResponse metrics = new MetricsResponse(); + Date startTime = (null == request.getStartTime()) ? new Date(-1) : request.getStartTime(); + Date endTime = (null == request.getEndTime()) ? new Date() : request.getEndTime(); + metrics.increase("totalCubes", (float) 0); + metrics.increase("totalStorage", (float) 0); + + for (CubeInstance cube : cubes) { + Date createdDate = new Date(-1); + createdDate = (cube.getCreateTimeUTC() == 0) ? createdDate : new Date(cube.getCreateTimeUTC()); + + if (createdDate.getTime() > startTime.getTime() && createdDate.getTime() < endTime.getTime()) { + metrics.increase("totalCubes"); + } + } + + metrics.increase("aveStorage", (metrics.get("totalCubes") == 0) ? 0 : metrics.get("totalStorage") / metrics.get("totalCubes")); + + return metrics; + } + + /** + * Calculate size of each region for given table and other info of the + * table. + * + * @param tableName The table name. + * @return The HBaseResponse object contains table size, region count. null + * if error happens. + * @throws IOException Exception when HTable resource is not closed correctly. + */ + public HBaseResponse getHTableInfo(String tableName) throws IOException { + if (htableInfoCache.containsKey(tableName)) { + return htableInfoCache.get(tableName); + } + + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + HTable table = null; + HBaseResponse hr = null; + long tableSize = 0; + int regionCount = 0; + + try { + table = new HTable(hconf, tableName); + + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); + Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); + + for (long s : sizeMap.values()) { + tableSize += s; + } + + regionCount = sizeMap.size(); + + // Set response. + hr = new HBaseResponse(); + hr.setTableSize(tableSize); + hr.setRegionCount(regionCount); + } finally { + if (null != table) { + table.close(); + } + } + + htableInfoCache.put(tableName, hr); + + return hr; + } + + /** + * Generate cardinality for table This will trigger a hadoop job + * The result will be merged into table exd info + * + * @param tableName + */ + public void calculateCardinality(String tableName, String submitter) { + String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); + tableName = dbTableName[0] + "." + dbTableName[1]; + TableDesc table = getMetadataManager().getTableDesc(tableName); + final Map<String, String> tableExd = getMetadataManager().getTableDescExd(tableName); + if (tableExd == null || table == null) { + IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName); + logger.error("Cannot find table descirptor " + tableName, e); + throw e; + } + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + job.setName("Hive Column Cardinality calculation for table '" + tableName + "'"); + job.setSubmitter(submitter); + + String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName; + String param = "-table " + tableName + " -output " + outPath; + + MapReduceExecutable step1 = new MapReduceExecutable(); + + step1.setMapReduceJobClass(HiveColumnCardinalityJob.class); + step1.setMapReduceParams(param); + + job.addTask(step1); + + HadoopShellExecutable step2 = new HadoopShellExecutable(); + + step2.setJobClass(HiveColumnCardinalityUpdateJob.class); + step2.setJobParams(param); + job.addTask(step2); + + getExecutableManager().addJob(job); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public void updateCubeNotifyList(CubeInstance cube, List<String> notifyList) throws IOException { + CubeDesc desc = cube.getDescriptor(); + desc.setNotifyList(notifyList); + getCubeDescManager().updateCubeDesc(desc); + } + + public CubeInstance rebuildLookupSnapshot(String cubeName, String segmentName, String lookupTable) throws IOException { + CubeManager cubeMgr = getCubeManager(); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY); + cubeMgr.buildSnapshotTable(seg, lookupTable); + + return cube; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public CubeInstance deleteSegment(CubeInstance cube, String segmentName) throws IOException { + + if (!segmentName.equals(cube.getSegments().get(0).getName()) && !segmentName.equals(cube.getSegments().get(cube.getSegments().size() - 1).getName())) { + throw new IllegalArgumentException("Cannot delete segment '" + segmentName + "' as it is neither the first nor the last segment."); + } + CubeSegment toDelete = null; + for (CubeSegment seg : cube.getSegments()) { + if (seg.getName().equals(segmentName)) { + toDelete = seg; + } + } + + if (toDelete.getStatus() != SegmentStatusEnum.READY) { + throw new IllegalArgumentException("Cannot delete segment '" + segmentName + "' as its status is not READY. Discard the on-going job for it."); + } + + CubeUpdate update = new CubeUpdate(cube); + update.setToRemoveSegs(new CubeSegment[] { toDelete }); + return CubeManager.getInstance(getConfig()).updateCube(update); + } + + /** + * purge the cube + * + * @throws IOException + * @throws JobException + */ + private CubeInstance releaseAllSegments(CubeInstance cube) throws IOException, JobException { + final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null); + for (CubingJob cubingJob : cubingJobs) { + final ExecutableState status = cubingJob.getStatus(); + if (status != ExecutableState.SUCCEED && status != ExecutableState.STOPPED && status != ExecutableState.DISCARDED) { + getExecutableManager().discardJob(cubingJob.getId()); + } + } + CubeUpdate update = new CubeUpdate(cube); + update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); + return CubeManager.getInstance(getConfig()).updateCube(update); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) + public String[] reloadHiveTable(String tables) throws IOException { + Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), getConfig()); + return (String[]) loaded.toArray(new String[loaded.size()]); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void unLoadHiveTable(String tableName) throws IOException { + String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); + tableName = dbTableName[0] + "." + dbTableName[1]; + HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase()); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void syncTableToProject(String[] tables, String project) throws IOException { + getProjectManager().addTableDescToProject(tables, project); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void removeTableFromProject(String tableName, String projectName) throws IOException { + String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); + tableName = dbTableName[0] + "." + dbTableName[1]; + getProjectManager().removeTableDescFromProject(tableName, projectName); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) + public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException { + MetadataManager metaMgr = getMetadataManager(); + for (String table : tables) { + Map<String, String> exdMap = metaMgr.getTableDescExd(table); + if (exdMap == null || !exdMap.containsKey(MetadataConstants.TABLE_EXD_CARDINALITY)) { + calculateCardinality(table, submitter); + } + } + } + + public void updateOnNewSegmentReady(String cubeName) { + logger.debug("on updateOnNewSegmentReady: " + cubeName); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String serverMode = kylinConfig.getServerMode(); + logger.debug("server mode: " + serverMode); + if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) { + keepCubeRetention(cubeName); + mergeCubeSegment(cubeName); + } + + } + + private void keepCubeRetention(String cubeName) { + logger.info("checking keepCubeRetention"); + CubeInstance cube = getCubeManager().getCube(cubeName); + CubeDesc desc = cube.getDescriptor(); + if (desc.getRetentionRange() <= 0) + return; + + synchronized (CubeService.class) { + cube = getCubeManager().getCube(cubeName); + List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY); + if (readySegs.isEmpty()) + return; + + List<CubeSegment> toRemoveSegs = Lists.newArrayList(); + long tail = readySegs.get(readySegs.size() - 1).getDateRangeEnd(); + long head = tail - desc.getRetentionRange(); + for (CubeSegment seg : readySegs) { + if (seg.getDateRangeEnd() <= head) + toRemoveSegs.add(seg); + } + + if (toRemoveSegs.size() > 0) { + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])); + try { + this.getCubeManager().updateCube(cubeBuilder); + } catch (IOException e) { + logger.error("Failed to remove old segment from cube " + cubeName, e); + } + } + } + } + + private void mergeCubeSegment(String cubeName) { + CubeInstance cube = getCubeManager().getCube(cubeName); + if (!cube.needAutoMerge()) + return; + + synchronized (CubeService.class) { + try { + cube = getCubeManager().getCube(cubeName); + Pair<Long, Long> offsets = getCubeManager().autoMergeCubeSegments(cube); + if (offsets != null) { + CubeSegment newSeg = getCubeManager().mergeSegments(cube, 0, 0, offsets.getFirst(), offsets.getSecond(), true); + logger.debug("Will submit merge job on " + newSeg); + DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(newSeg, "SYSTEM"); + getExecutableManager().addJob(job); + } else { + logger.debug("Not ready for merge on cube " + cubeName); + } + } catch (IOException e) { + logger.error("Failed to auto merge cube " + cubeName, e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java new file mode 100644 index 0000000..ba51ea1 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.service; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.kylin.metadata.badquery.BadQueryHistory; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.tool.DiagnosisInfoCLI; +import org.apache.kylin.tool.JobDiagnosisInfoCLI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.stereotype.Component; + +import com.google.common.io.Files; + +@Component("diagnosisService") +public class DiagnosisService extends BasicService { + + private static final Logger logger = LoggerFactory.getLogger(DiagnosisService.class); + + private File getDumpDir() { + return Files.createTempDir(); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public BadQueryHistory getProjectBadQueryHistory(String project) throws IOException { + return getBadQueryHistoryManager().getBadQueriesForProject(project); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public String dumpProjectDiagnosisInfo(String project) throws IOException { + String[] args = { "-project", "-all", "-destDir", getDumpDir().getAbsolutePath() }; + logger.info("DiagnosisInfoCLI args: " + Arrays.toString(args)); + DiagnosisInfoCLI diagnosisInfoCli = new DiagnosisInfoCLI(); + diagnosisInfoCli.execute(args); + return diagnosisInfoCli.getExportDest(); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public String dumpJobDiagnosisInfo(String jobId) throws IOException { + String[] args = { "-jobId", jobId, "-destDir", getDumpDir().getAbsolutePath() }; + logger.info("JobDiagnosisInfoCLI args: " + Arrays.toString(args)); + JobDiagnosisInfoCLI jobInfoExtractor = new JobDiagnosisInfoCLI(); + jobInfoExtractor.execute(args); + return jobInfoExtractor.getExportDest(); + } + + public static void main(String[] args1) { + String[] args = { "-project", "-all", "-destDir", Files.createTempDir().getAbsolutePath() }; + logger.info("DiagnosisInfoCLI args: " + Arrays.toString(args)); + DiagnosisInfoCLI diagnosisInfoCli = new DiagnosisInfoCLI(); + diagnosisInfoCli.execute(args); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/ExtFilterService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ExtFilterService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ExtFilterService.java new file mode 100644 index 0000000..0c98965 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/ExtFilterService.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.List; + +import org.apache.kylin.metadata.model.ExternalFilterDesc; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.stereotype.Component; + +@Component("extFilterService") +public class ExtFilterService extends BasicService { + private static final Logger logger = LoggerFactory.getLogger(ExtFilterService.class); + + @Autowired + private AccessService accessService; + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void saveExternalFilter(ExternalFilterDesc desc) throws IOException { + if (getMetadataManager().getExtFilterDesc(desc.getName()) != null) { + throw new InternalErrorException("The filter named " + desc.getName() + " already exists"); + } + getMetadataManager().saveExternalFilter(desc); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void updateExternalFilter(ExternalFilterDesc desc) throws IOException { + if (getMetadataManager().getExtFilterDesc(desc.getName()) == null) { + throw new InternalErrorException("The filter named " + desc.getName() + " does not exists"); + } + getMetadataManager().saveExternalFilter(desc); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void removeExternalFilter(String name) throws IOException { + getMetadataManager().removeExternalFilter(name); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void syncExtFilterToProject(String[] filters, String project) throws IOException { + getProjectManager().addExtFilterToProject(filters, project); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void removeExtFilterFromProject(String filterName, String projectName) throws IOException { + getProjectManager().removeExtFilterFromProject(filterName, projectName); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public List<ExternalFilterDesc> listAllExternalFilters() { + return getMetadataManager().listAllExternalFilters(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java new file mode 100644 index 0000000..c868264 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.common.ShellExecutable; +import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.apache.kylin.job.constant.JobTimeFilterEnum; +import org.apache.kylin.job.exception.JobException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.rest.constant.Constant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.stereotype.Component; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * @author ysong1 + */ +@Component("jobService") +public class JobService extends BasicService { + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(JobService.class); + + @Autowired + private AccessService accessService; + + public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, final JobTimeFilterEnum timeFilter) throws IOException, JobException { + Integer limit = (null == limitValue) ? 30 : limitValue; + Integer offset = (null == offsetValue) ? 0 : offsetValue; + List<JobInstance> jobs = listAllJobs(cubeName, projectName, statusList, timeFilter); + Collections.sort(jobs); + + if (jobs.size() <= offset) { + return Collections.emptyList(); + } + + if ((jobs.size() - offset) < limit) { + return jobs.subList(offset, jobs.size()); + } + + return jobs.subList(offset, offset + limit); + } + + public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + long currentTimeMillis = calendar.getTimeInMillis(); + long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter); + return listCubeJobInstance(cubeName, projectName, statusList, timeStartInMillis, currentTimeMillis); + } + + @Deprecated + public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue) throws IOException, JobException { + Integer limit = (null == limitValue) ? 30 : limitValue; + Integer offset = (null == offsetValue) ? 0 : offsetValue; + List<JobInstance> jobs = listAllJobs(cubeName, projectName, statusList); + Collections.sort(jobs); + + if (jobs.size() <= offset) { + return Collections.emptyList(); + } + + if ((jobs.size() - offset) < limit) { + return jobs.subList(offset, jobs.size()); + } + + return jobs.subList(offset, offset + limit); + } + + public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList) { + return listCubeJobInstance(cubeName, projectName, statusList); + } + + private List<JobInstance> listCubeJobInstance(final String cubeName, final String projectName, List<JobStatusEnum> statusList, final long timeStartInMillis, final long timeEndInMillis) { + Set<ExecutableState> states = convertStatusEnumToStates(statusList); + final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis); + return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states, timeStartInMillis, timeEndInMillis, allOutputs)).transform(new Function<CubingJob, JobInstance>() { + @Override + public JobInstance apply(CubingJob cubingJob) { + return parseToJobInstance(cubingJob, allOutputs); + } + })); + } + + private List<JobInstance> listCubeJobInstance(final String cubeName, final String projectName, List<JobStatusEnum> statusList) { + Set<ExecutableState> states = convertStatusEnumToStates(statusList); + final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs(); + return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states, allOutputs)).transform(new Function<CubingJob, JobInstance>() { + @Override + public JobInstance apply(CubingJob cubingJob) { + return parseToJobInstance(cubingJob, allOutputs); + } + })); + } + + private Set<ExecutableState> convertStatusEnumToStates(List<JobStatusEnum> statusList) { + Set<ExecutableState> states; + if (statusList == null || statusList.isEmpty()) { + states = EnumSet.allOf(ExecutableState.class); + } else { + states = Sets.newHashSet(); + for (JobStatusEnum status : statusList) { + states.add(parseToExecutableState(status)); + } + } + return states; + } + + private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) { + switch (timeFilter) { + case LAST_ONE_DAY: + calendar.add(Calendar.DAY_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_WEEK: + calendar.add(Calendar.WEEK_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_MONTH: + calendar.add(Calendar.MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_YEAR: + calendar.add(Calendar.YEAR, -1); + return calendar.getTimeInMillis(); + case ALL: + return 0; + default: + throw new RuntimeException("illegal timeFilter for job history:" + timeFilter); + } + } + + private ExecutableState parseToExecutableState(JobStatusEnum status) { + switch (status) { + case DISCARDED: + return ExecutableState.DISCARDED; + case ERROR: + return ExecutableState.ERROR; + case FINISHED: + return ExecutableState.SUCCEED; + case NEW: + return ExecutableState.READY; + case PENDING: + return ExecutableState.READY; + case RUNNING: + return ExecutableState.RUNNING; + default: + throw new RuntimeException("illegal status:" + status); + } + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, // + CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException { + + checkCubeDescSignature(cube); + checkNoRunningJob(cube); + + DefaultChainedExecutable job; + + if (buildType == CubeBuildTypeEnum.BUILD) { + CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset); + job = EngineFactory.createBatchCubingJob(newSeg, submitter); + } else if (buildType == CubeBuildTypeEnum.MERGE) { + CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); + job = EngineFactory.createBatchMergeJob(newSeg, submitter); + } else if (buildType == CubeBuildTypeEnum.REFRESH) { + CubeSegment refreshSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset); + job = EngineFactory.createBatchCubingJob(refreshSeg, submitter); + } else { + throw new JobException("invalid build type:" + buildType); + } + getExecutableManager().addJob(job); + JobInstance jobInstance = getSingleJobInstance(job); + + accessService.init(jobInstance, null); + accessService.inherit(jobInstance, cube); + + return jobInstance; + } + + private void checkCubeDescSignature(CubeInstance cube) { + if (!cube.getDescriptor().checkSignature()) + throw new IllegalStateException("Inconsistent cube desc signature for " + cube.getDescriptor()); + } + + private void checkNoRunningJob(CubeInstance cube) throws JobException { + final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.allOf(ExecutableState.class)); + for (CubingJob job : cubingJobs) { + if (job.getStatus() == ExecutableState.READY || job.getStatus() == ExecutableState.RUNNING || job.getStatus() == ExecutableState.ERROR) { + throw new JobException("The cube " + cube.getName() + " has running job(" + job.getId() + ") please discard it and try again."); + } + } + } + + public JobInstance getJobInstance(String uuid) throws IOException, JobException { + return getSingleJobInstance(getExecutableManager().getJob(uuid)); + } + + public Output getOutput(String id) { + return getExecutableManager().getOutput(id); + } + + private JobInstance getSingleJobInstance(AbstractExecutable job) { + if (job == null) { + return null; + } + Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); + CubingJob cubeJob = (CubingJob) job; + final JobInstance result = new JobInstance(); + result.setName(job.getName()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); + result.setLastModified(cubeJob.getLastModified()); + result.setSubmitter(cubeJob.getSubmitter()); + result.setUuid(cubeJob.getId()); + result.setType(CubeBuildTypeEnum.BUILD); + result.setStatus(parseToJobStatus(job.getStatus())); + result.setMrWaiting(cubeJob.getMapReduceWaitTime() / 1000); + result.setDuration(cubeJob.getDuration() / 1000); + for (int i = 0; i < cubeJob.getTasks().size(); ++i) { + AbstractExecutable task = cubeJob.getTasks().get(i); + result.addStep(parseToJobStep(task, i, getExecutableManager().getOutput(task.getId()))); + } + return result; + } + + private JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) { + if (job == null) { + return null; + } + Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); + CubingJob cubeJob = (CubingJob) job; + Output output = outputs.get(job.getId()); + final JobInstance result = new JobInstance(); + result.setName(job.getName()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); + result.setLastModified(output.getLastModified()); + result.setSubmitter(cubeJob.getSubmitter()); + result.setUuid(cubeJob.getId()); + result.setType(CubeBuildTypeEnum.BUILD); + result.setStatus(parseToJobStatus(output.getState())); + result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); + result.setDuration(AbstractExecutable.getDuration(AbstractExecutable.getStartTime(output), AbstractExecutable.getEndTime(output)) / 1000); + for (int i = 0; i < cubeJob.getTasks().size(); ++i) { + AbstractExecutable task = cubeJob.getTasks().get(i); + result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); + } + return result; + } + + private JobInstance.JobStep parseToJobStep(AbstractExecutable task, int i, Output stepOutput) { + Preconditions.checkNotNull(stepOutput); + JobInstance.JobStep result = new JobInstance.JobStep(); + result.setId(task.getId()); + result.setName(task.getName()); + result.setSequenceID(i); + result.setStatus(parseToJobStepStatus(stepOutput.getState())); + for (Map.Entry<String, String> entry : stepOutput.getExtra().entrySet()) { + if (entry.getKey() != null && entry.getValue() != null) { + result.putInfo(entry.getKey(), entry.getValue()); + } + } + result.setExecStartTime(AbstractExecutable.getStartTime(stepOutput)); + result.setExecEndTime(AbstractExecutable.getEndTime(stepOutput)); + if (task instanceof ShellExecutable) { + result.setExecCmd(((ShellExecutable) task).getCmd()); + } + if (task instanceof MapReduceExecutable) { + result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); + result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); + } + if (task instanceof HadoopShellExecutable) { + result.setExecCmd(((HadoopShellExecutable) task).getJobParams()); + } + return result; + } + + private JobStatusEnum parseToJobStatus(ExecutableState state) { + switch (state) { + case READY: + return JobStatusEnum.PENDING; + case RUNNING: + return JobStatusEnum.RUNNING; + case ERROR: + return JobStatusEnum.ERROR; + case DISCARDED: + return JobStatusEnum.DISCARDED; + case SUCCEED: + return JobStatusEnum.FINISHED; + case STOPPED: + default: + throw new RuntimeException("invalid state:" + state); + } + } + + private JobStepStatusEnum parseToJobStepStatus(ExecutableState state) { + switch (state) { + case READY: + return JobStepStatusEnum.PENDING; + case RUNNING: + return JobStepStatusEnum.RUNNING; + case ERROR: + return JobStepStatusEnum.ERROR; + case DISCARDED: + return JobStepStatusEnum.DISCARDED; + case SUCCEED: + return JobStepStatusEnum.FINISHED; + case STOPPED: + default: + throw new RuntimeException("invalid state:" + state); + } + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") + public void resumeJob(JobInstance job) throws IOException, JobException { + getExecutableManager().resumeJob(job.getId()); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") + public JobInstance cancelJob(JobInstance job) throws IOException, JobException { + // CubeInstance cube = this.getCubeManager().getCube(job.getRelatedCube()); + // for (BuildCubeJob cubeJob: listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING))) { + // getExecutableManager().stopJob(cubeJob.getId()); + // } + CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube()); + final String segmentIds = job.getRelatedSegment(); + for (String segmentId : StringUtils.split(segmentIds)) { + final CubeSegment segment = cubeInstance.getSegmentById(segmentId); + if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) { + // Remove this segments + CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); + cubeBuilder.setToRemoveSegs(segment); + getCubeManager().updateCube(cubeBuilder); + } + } + getExecutableManager().discardJob(job.getId()); + return job; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java b/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java new file mode 100644 index 0000000..1f286e3 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PostFilter; +import org.springframework.stereotype.Component; + +@Component("kafkaMgmtService") +public class KafkaConfigService extends BasicService { + + @Autowired + private AccessService accessService; + + @PostFilter(Constant.ACCESS_POST_FILTER_READ) + public List<KafkaConfig> listAllKafkaConfigs(final String kafkaConfigName) throws IOException { + List<KafkaConfig> kafkaConfigs = new ArrayList<KafkaConfig>(); + // CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null; + if (null == kafkaConfigName) { + kafkaConfigs = getKafkaManager().listAllKafkaConfigs(); + } else { + List<KafkaConfig> configs = getKafkaManager().listAllKafkaConfigs(); + for (KafkaConfig config : configs) { + if (kafkaConfigName.equals(config.getName())) { + kafkaConfigs.add(config); + } + } + } + + return kafkaConfigs; + } + + public List<KafkaConfig> getKafkaConfigs(final String kafkaConfigName, final Integer limit, final Integer offset) throws IOException { + + List<KafkaConfig> kafkaConfigs; + kafkaConfigs = listAllKafkaConfigs(kafkaConfigName); + + if (limit == null || offset == null) { + return kafkaConfigs; + } + + if ((kafkaConfigs.size() - offset) < limit) { + return kafkaConfigs.subList(offset, kafkaConfigs.size()); + } + + return kafkaConfigs.subList(offset, offset + limit); + } + + public KafkaConfig createKafkaConfig(KafkaConfig config) throws IOException { + if (getKafkaManager().getKafkaConfig(config.getName()) != null) { + throw new InternalErrorException("The kafkaConfig named " + config.getName() + " already exists"); + } + getKafkaManager().createKafkaConfig(config.getName(), config); + return config; + } + + // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + public KafkaConfig updateKafkaConfig(KafkaConfig config) throws IOException { + return getKafkaManager().updateKafkaConfig(config); + } + + public KafkaConfig getKafkaConfig(String configName) throws IOException { + return getKafkaManager().getKafkaConfig(configName); + } + + // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + public void dropKafkaConfig(KafkaConfig config) throws IOException { + getKafkaManager().removeKafkaConfig(config); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java new file mode 100644 index 0000000..9d8ccfb --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.security.AclPermission; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PostFilter; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Component; + +/** + * @author jiazhong + */ +@Component("modelMgmtService") +public class ModelService extends BasicService { + + @Autowired + private AccessService accessService; + + @PostFilter(Constant.ACCESS_POST_FILTER_READ) + public List<DataModelDesc> listAllModels(final String modelName, final String projectName) throws IOException { + List<DataModelDesc> models = null; + ProjectInstance project = (null != projectName) ? getProjectManager().getProject(projectName) : null; + + if (null == project) { + models = getMetadataManager().getModels(); + } else { + models = getMetadataManager().getModels(projectName); + project.getModels(); + } + + List<DataModelDesc> filterModels = new ArrayList<DataModelDesc>(); + for (DataModelDesc modelDesc : models) { + boolean isModelMatch = (null == modelName) || modelDesc.getName().toLowerCase().contains(modelName.toLowerCase()); + + if (isModelMatch) { + filterModels.add(modelDesc); + } + } + + return filterModels; + } + + public List<DataModelDesc> getModels(final String modelName, final String projectName, final Integer limit, final Integer offset) throws IOException { + + List<DataModelDesc> modelDescs; + modelDescs = listAllModels(modelName, projectName); + + if (limit == null || offset == null) { + return modelDescs; + } + + if ((modelDescs.size() - offset) < limit) { + return modelDescs.subList(offset, modelDescs.size()); + } + + return modelDescs.subList(offset, offset + limit); + } + + public DataModelDesc createModelDesc(String projectName, DataModelDesc desc) throws IOException { + if (getMetadataManager().getDataModelDesc(desc.getName()) != null) { + throw new InternalErrorException("The model named " + desc.getName() + " already exists"); + } + DataModelDesc createdDesc = null; + String owner = SecurityContextHolder.getContext().getAuthentication().getName(); + createdDesc = getMetadataManager().createDataModelDesc(desc, projectName, owner); + + accessService.init(createdDesc, AclPermission.ADMINISTRATION); + ProjectInstance project = getProjectManager().getProject(projectName); + accessService.inherit(createdDesc, project); + return createdDesc; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + public DataModelDesc updateModelAndDesc(DataModelDesc desc) throws IOException { + + getMetadataManager().updateDataModelDesc(desc); + return desc; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + public void dropModel(DataModelDesc desc) throws IOException { + + //check cube desc exist + List<CubeDesc> cubeDescs = getCubeDescManager().listAllDesc(); + for (CubeDesc cubeDesc : cubeDescs) { + if (cubeDesc.getModelName().equals(desc.getName())) { + throw new InternalErrorException("Model referenced by cube,drop cubes under model and try again."); + } + } + + //check II desc exist + List<IIDesc> iiDescs = getIIDescManager().listAllDesc(); + for (IIDesc iidesc : iiDescs) { + if (iidesc.getModelName().equals(desc.getName())) { + throw new InternalErrorException("Model referenced by IIDesc."); + } + } + + getMetadataManager().dropModel(desc); + + accessService.clean(desc, true); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + public boolean isTableInAnyModel(String tableName) { + String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); + tableName = dbTableName[0] + "." + dbTableName[1]; + return getMetadataManager().isTableInAnyModel(tableName); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + public boolean isTableInModel(String tableName, String projectName) throws IOException { + String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); + tableName = dbTableName[0] + "." + dbTableName[1]; + return getMetadataManager().isTableInModel(tableName, projectName); + } +}
