KYLIN-1340 CubeMetaExtractor support streaming case and skip segments
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c08ded6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c08ded6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c08ded6 Branch: refs/heads/2.x-staging Commit: 4c08ded63f78aad93eefa9814d48af2486725967 Parents: 2e1d2f6 Author: honma <ho...@ebay.com> Authored: Wed Feb 24 15:45:38 2016 +0800 Committer: honma <ho...@ebay.com> Committed: Fri Feb 26 17:54:37 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/job/CubeMetaExtractor.java | 327 +++++++++++++++++++ .../kylin/common/persistence/ResourceTool.java | 2 +- .../engine/streaming/StreamingManager.java | 100 +----- .../kylin/source/kafka/KafkaConfigManager.java | 47 +-- .../kylin/source/kafka/config/KafkaConfig.java | 4 +- .../storage/hbase/util/CubeMetaExtractor.java | 284 ---------------- 6 files changed, 345 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java new file mode 100644 index 0000000..527ef0a --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.ResourceTool; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.exception.PersistentException; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.project.RealizationEntry; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.RealizationRegistry; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.storage.hybrid.HybridInstance; +import org.apache.kylin.storage.hybrid.HybridManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * extract cube related info for debugging/distributing purpose + * TODO: deal with II case + */ +public class CubeMetaExtractor extends AbstractApplication { + + private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class); + + @SuppressWarnings("static-access") + private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube"); + @SuppressWarnings("static-access") + private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid"); + @SuppressWarnings("static-access") + private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project"); + + @SuppressWarnings("static-access") + private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc. Default true").create("includeSegments"); + @SuppressWarnings("static-access") + private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too. Default true").create("includeJobs"); + + @SuppressWarnings("static-access") + private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir"); + + private Options options = null; + private KylinConfig kylinConfig; + private MetadataManager metadataManager; + private ProjectManager projectManager; + private HybridManager hybridManager; + private CubeManager cubeManager; + private StreamingManager streamingManager; + private KafkaConfigManager kafkaConfigManager; + private CubeDescManager cubeDescManager; + private ExecutableDao executableDao; + private RealizationRegistry realizationRegistry; + + boolean includeSegments; + boolean includeJobs; + + List<String> requiredResources = Lists.newArrayList(); + List<String> optionalResources = Lists.newArrayList(); + List<CubeInstance> cubesToTrimAndSave = Lists.newArrayList();//these cubes needs to be saved skipping segments + + public CubeMetaExtractor() { + options = new Options(); + + OptionGroup realizationOrProject = new OptionGroup(); + realizationOrProject.addOption(OPTION_CUBE); + realizationOrProject.addOption(OPTION_PROJECT); + realizationOrProject.addOption(OPTION_HYBRID); + realizationOrProject.setRequired(true); + + options.addOptionGroup(realizationOrProject); + options.addOption(OPTION_INCLUDE_SEGMENTS); + options.addOption(OPTION_INCLUDE_JOB); + options.addOption(OPTION_DEST); + + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true; + includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true; + String dest = null; + if (optionsHelper.hasOption(OPTION_DEST)) { + dest = optionsHelper.getOptionValue(OPTION_DEST); + } + + kylinConfig = KylinConfig.getInstanceFromEnv(); + metadataManager = MetadataManager.getInstance(kylinConfig); + projectManager = ProjectManager.getInstance(kylinConfig); + hybridManager = HybridManager.getInstance(kylinConfig); + cubeManager = CubeManager.getInstance(kylinConfig); + cubeDescManager = CubeDescManager.getInstance(kylinConfig); + streamingManager = StreamingManager.getInstance(kylinConfig); + kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); + executableDao = ExecutableDao.getInstance(kylinConfig); + realizationRegistry = RealizationRegistry.getInstance(kylinConfig); + + if (optionsHelper.hasOption(OPTION_PROJECT)) { + ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT)); + if (projectInstance == null) { + throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist"); + } + addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName())); + List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries(); + for (RealizationEntry realizationEntry : realizationEntries) { + retrieveResourcePath(getRealization(realizationEntry)); + } + } else if (optionsHelper.hasOption(OPTION_CUBE)) { + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE); + IRealization realization; + + if ((realization = cubeManager.getRealization(cubeName)) != null) { + retrieveResourcePath(realization); + } else { + throw new IllegalArgumentException("No cube found with name of " + cubeName); + } + } else if (optionsHelper.hasOption(OPTION_HYBRID)) { + String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID); + IRealization realization; + + if ((realization = hybridManager.getRealization(hybridName)) != null) { + retrieveResourcePath(realization); + } else { + throw new IllegalArgumentException("No hybrid found with name of" + hybridName); + } + } + + executeExtraction(dest); + } + + private void executeExtraction(String dest) { + logger.info("The resource paths going to be extracted:"); + for (String s : requiredResources) { + logger.info(s + "(required)"); + } + for (String s : optionalResources) { + logger.info(s + "(optional)"); + } + for (CubeInstance cube : cubesToTrimAndSave) { + logger.info("Cube {} will be trimmed and extracted", cube); + } + + if (dest == null) { + logger.info("Dest is not set, exit directly without extracting"); + } else { + try { + ResourceStore src = ResourceStore.getStore(KylinConfig.getInstanceFromEnv()); + ResourceStore dst = ResourceStore.getStore(KylinConfig.createInstanceFromUri(dest)); + + for (String path : requiredResources) { + ResourceTool.copyR(src, dst, path); + } + + for (String path : optionalResources) { + try { + ResourceTool.copyR(src, dst, path); + } catch (Exception e) { + logger.warn("Exception when copying optional resource {}. May be caused by resource missing. Ignore it."); + } + } + + for (CubeInstance cube : cubesToTrimAndSave) { + CubeInstance trimmedCube = CubeInstance.getCopyOf(cube); + trimmedCube.getSegments().clear(); + trimmedCube.setUuid(cube.getUuid()); + dst.putResource(trimmedCube.getResourcePath(), trimmedCube, CubeManager.CUBE_SERIALIZER); + } + + } catch (IOException e) { + throw new RuntimeException("IOException", e); + } + } + } + + private IRealization getRealization(RealizationEntry realizationEntry) { + return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization()); + } + + private void dealWithStreaming(CubeInstance cube) { + for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) { + if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) { + requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName())); + requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName())); + } + } + } + + private void retrieveResourcePath(IRealization realization) { + + logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType()); + + if (realization instanceof CubeInstance) { + CubeInstance cube = (CubeInstance) realization; + String descName = cube.getDescName(); + CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName); + String modelName = cubeDesc.getModelName(); + DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName); + + dealWithStreaming(cube); + + for (String tableName : modelDesc.getAllTables()) { + addRequired(requiredResources, TableDesc.concatResourcePath(tableName)); + addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName)); + } + + addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName())); + addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName())); + + if (includeSegments) { + addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName())); + for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) { + for (String dictPat : segment.getDictionaryPaths()) { + addRequired(requiredResources, dictPat); + } + for (String snapshotPath : segment.getSnapshotPaths()) { + addRequired(requiredResources, snapshotPath); + } + addRequired(requiredResources, segment.getStatisticsResourcePath()); + + if (includeJobs) { + String lastJobId = segment.getLastBuildJobID(); + if (!StringUtils.isEmpty(lastJobId)) { + throw new RuntimeException("No job exist for segment :" + segment); + } else { + try { + ExecutablePO executablePO = executableDao.getJob(lastJobId); + addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId)); + addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId)); + for (ExecutablePO task : executablePO.getTasks()) { + addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid())); + addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid())); + } + } catch (PersistentException e) { + throw new RuntimeException("PersistentException", e); + } + } + } else { + logger.info("Job info will not be extracted"); + } + } + } else { + if (includeJobs) { + logger.warn("It's useless to set includeJobs to true when includeSegments is set to false"); + } + + cubesToTrimAndSave.add(cube); + } + } else if (realization instanceof HybridInstance) { + HybridInstance hybridInstance = (HybridInstance) realization; + addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName())); + for (IRealization iRealization : hybridInstance.getRealizations()) { + if (iRealization.getType() != RealizationType.CUBE) { + throw new RuntimeException("Hybrid " + iRealization.getName() + " contains non cube child " + iRealization.getName() + " with type " + iRealization.getType()); + } + retrieveResourcePath(iRealization); + } + } else if (realization instanceof IIInstance) { + throw new IllegalStateException("Does not support extract II instance or hybrid that contains II"); + } else { + throw new IllegalStateException("Unknown realization type: " + realization.getType()); + } + } + + private void addRequired(List<String> resourcePaths, String record) { + logger.info("adding required resource {}", record); + resourcePaths.add(record); + } + + private void addOptional(List<String> optionalPaths, String record) { + logger.info("adding optional resource {}", record); + optionalPaths.add(record); + } + + public static void main(String[] args) { + CubeMetaExtractor extractor = new CubeMetaExtractor(); + extractor.execute(args); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java index 3b8e0c1..489e45a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java @@ -115,7 +115,7 @@ public class ResourceTool { copyR(src, dst, "/"); } - private static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException { + public static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException { ArrayList<String> children = src.listResources(path); // case of resource (not a folder) http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java index af04a11..e0b086d 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java @@ -34,24 +34,14 @@ package org.apache.kylin.engine.streaming; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; @@ -60,13 +50,6 @@ import org.apache.kylin.metadata.MetadataConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.MapType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - /** */ public class StreamingManager { @@ -121,18 +104,6 @@ public class StreamingManager { } } - private String formatStreamingConfigPath(String name) { - return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; - } - - private String formatStreamingOutputPath(String streaming, int partition) { - return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json"; - } - - private String formatStreamingOutputPath(String streaming, List<Integer> partitions) { - return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json"; - } - public StreamingConfig getStreamingConfig(String name) { return streamingMap.get(name); } @@ -214,77 +185,12 @@ public class StreamingManager { if (streamingMap.containsKey(streamingConfig.getName())) throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists"); - String path = formatStreamingConfigPath(streamingConfig.getName()); + String path = StreamingConfig.concatResourcePath(streamingConfig.getName()); getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER); streamingMap.put(streamingConfig.getName(), streamingConfig); return streamingConfig; } - public long getOffset(String streaming, int shard) { - final String resPath = formatStreamingOutputPath(streaming, shard); - InputStream inputStream = null; - try { - final RawResource res = getStore().getResource(resPath); - if (res == null) { - return 0; - } else { - inputStream = res.inputStream; - final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); - return Long.parseLong(br.readLine()); - } - } catch (Exception e) { - logger.error("error get offset, path:" + resPath, e); - throw new RuntimeException("error get offset, path:" + resPath, e); - } finally { - IOUtils.closeQuietly(inputStream); - } - } - - public void updateOffset(String streaming, int shard, long offset) { - Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0"); - final String resPath = formatStreamingOutputPath(streaming, shard); - try { - getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()), getStore().getResourceTimestamp(resPath)); - } catch (IOException e) { - logger.error("error update offset, path:" + resPath, e); - throw new RuntimeException("error update offset, path:" + resPath, e); - } - } - - public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) { - Collections.sort(partitions); - final String resPath = formatStreamingOutputPath(streaming, partitions); - InputStream inputStream = null; - try { - RawResource res = getStore().getResource(resPath); - if (res == null) - return Collections.emptyMap(); - - inputStream = res.inputStream; - final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType); - return result; - } catch (IOException e) { - logger.error("error get offset, path:" + resPath, e); - throw new RuntimeException("error get offset, path:" + resPath, e); - } finally { - IOUtils.closeQuietly(inputStream); - } - } - - public void updateOffset(String streaming, HashMap<Integer, Long> offset) { - List<Integer> partitions = Lists.newLinkedList(offset.keySet()); - Collections.sort(partitions); - final String resPath = formatStreamingOutputPath(streaming, partitions); - try { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - mapper.writeValue(baos, offset); - getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath)); - } catch (IOException e) { - logger.error("error update offset, path:" + resPath, e); - throw new RuntimeException("error update offset, path:" + resPath, e); - } - } - private StreamingConfig loadStreamingConfigAt(String path) throws IOException { ResourceStore store = getStore(); StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER); @@ -324,8 +230,4 @@ public class StreamingManager { logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)"); } - - private final ObjectMapper mapper = new ObjectMapper(); - private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class)); - } http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index ac20fc3..1d07f23 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -36,7 +36,6 @@ package org.apache.kylin.source.kafka; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -52,11 +51,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.MapType; -import com.fasterxml.jackson.databind.type.SimpleType; - /** */ public class KafkaConfigManager { @@ -87,7 +81,7 @@ public class KafkaConfigManager { return ResourceStore.getStore(this.config); } - public static KafkaConfigManager getInstance(KylinConfig config){ + public static KafkaConfigManager getInstance(KylinConfig config) { KafkaConfigManager r = CACHE.get(config); if (r != null) { return r; @@ -98,16 +92,16 @@ public class KafkaConfigManager { if (r != null) { return r; } - try{ - r = new KafkaConfigManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one KafkaConfigManager singleton exist"); + try { + r = new KafkaConfigManager(config); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one KafkaConfigManager singleton exist"); + } + return r; + } catch (IOException e) { + throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e); } - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e); - } } } @@ -125,7 +119,7 @@ public class KafkaConfigManager { public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException { // Save Source - String path = KafkaConfig.getKafkaResourcePath(name); + String path = KafkaConfig.concatResourcePath(name); // Reload the KafkaConfig KafkaConfig ndesc = loadKafkaConfigAt(path); @@ -135,14 +129,6 @@ public class KafkaConfigManager { return ndesc; } - private boolean checkExistence(String name) { - return true; - } - - private String formatStreamingConfigPath(String name) { - return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + name + ".json"; - } - public boolean createKafkaConfig(String name, KafkaConfig config) { if (config == null || StringUtils.isEmpty(config.getName())) { @@ -152,7 +138,7 @@ public class KafkaConfigManager { if (kafkaMap.containsKey(config.getName())) throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists"); try { - getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER); + getStore().putResource(KafkaConfig.concatResourcePath(name), config, KafkaConfig.SERIALIZER); kafkaMap.put(config.getName(), config); return true; } catch (IOException e) { @@ -185,7 +171,7 @@ public class KafkaConfigManager { private KafkaConfig loadKafkaConfigAt(String path) throws IOException { ResourceStore store = getStore(); - KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER ); + KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class, KAFKA_SERIALIZER); if (StringUtils.isBlank(kafkaConfig.getName())) { throw new IllegalStateException("KafkaConfig name must not be blank"); @@ -193,7 +179,6 @@ public class KafkaConfigManager { return kafkaConfig; } - public KafkaConfig getKafkaConfig(String name) { return kafkaMap.get(name); } @@ -203,7 +188,7 @@ public class KafkaConfigManager { throw new IllegalArgumentException(); } - String path = formatStreamingConfigPath(kafkaConfig.getName()); + String path = KafkaConfig.concatResourcePath(kafkaConfig.getName()); getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER); } @@ -214,7 +199,6 @@ public class KafkaConfigManager { kafkaMap.remove(kafkaConfig.getName()); } - private void reloadAllKafkaConfig() throws IOException { ResourceStore store = getStore(); logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT)); @@ -245,7 +229,4 @@ public class KafkaConfigManager { logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)"); } - private final ObjectMapper mapper = new ObjectMapper(); - private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class)); - } http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index 100ca2d..1dce844 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -85,10 +85,10 @@ public class KafkaConfig extends RootPersistentEntity { private String parserProperties; public String getResourcePath() { - return getKafkaResourcePath(name); + return concatResourcePath(name); } - public static String getKafkaResourcePath(String streamingName) { + public static String concatResourcePath(String streamingName) { return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX; } http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java deleted file mode 100644 index 680dff8..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.util; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.OptionGroup; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceTool; -import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.cube.CubeDescManager; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.invertedindex.IIDescManager; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.job.dao.ExecutableDao; -import org.apache.kylin.job.dao.ExecutablePO; -import org.apache.kylin.job.exception.PersistentException; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.metadata.project.RealizationEntry; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.RealizationRegistry; -import org.apache.kylin.storage.hybrid.HybridInstance; -import org.apache.kylin.storage.hybrid.HybridManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * extract cube related info for debugging/distributing purpose - * TODO: deal with II case, deal with Streaming case - */ -public class CubeMetaExtractor extends AbstractApplication { - - private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class); - - @SuppressWarnings("static-access") - private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube"); - @SuppressWarnings("static-access") - private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid"); - @SuppressWarnings("static-access") - private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project"); - - @SuppressWarnings("static-access") - private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc.").create("includeSegments"); - @SuppressWarnings("static-access") - private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too").create("includeJobs"); - - @SuppressWarnings("static-access") - private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir"); - - private Options options = null; - private KylinConfig kylinConfig; - private MetadataManager metadataManager; - private ProjectManager projectManager; - private HybridManager hybridManager; - private CubeManager cubeManager; - private CubeDescManager cubeDescManager; - private IIManager iiManager; - private IIDescManager iiDescManager; - private ExecutableDao executableDao; - RealizationRegistry realizationRegistry; - - public CubeMetaExtractor() { - options = new Options(); - - OptionGroup realizationOrProject = new OptionGroup(); - realizationOrProject.addOption(OPTION_CUBE); - realizationOrProject.addOption(OPTION_PROJECT); - realizationOrProject.addOption(OPTION_HYBRID); - realizationOrProject.setRequired(true); - - options.addOptionGroup(realizationOrProject); - options.addOption(OPTION_INCLUDE_SEGMENTS); - options.addOption(OPTION_INCLUDE_JOB); - options.addOption(OPTION_DEST); - - } - - @Override - protected Options getOptions() { - return options; - } - - @Override - protected void execute(OptionsHelper optionsHelper) throws Exception { - boolean includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true; - boolean includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true; - String dest = null; - if (optionsHelper.hasOption(OPTION_DEST)) { - dest = optionsHelper.getOptionValue(OPTION_DEST); - } - - if (!includeSegments) { - throw new RuntimeException("Does not support skip segments for now"); - } - - kylinConfig = KylinConfig.getInstanceFromEnv(); - metadataManager = MetadataManager.getInstance(kylinConfig); - projectManager = ProjectManager.getInstance(kylinConfig); - hybridManager = HybridManager.getInstance(kylinConfig); - cubeManager = CubeManager.getInstance(kylinConfig); - cubeDescManager = CubeDescManager.getInstance(kylinConfig); - iiManager = IIManager.getInstance(kylinConfig); - iiDescManager = IIDescManager.getInstance(kylinConfig); - executableDao = ExecutableDao.getInstance(kylinConfig); - realizationRegistry = RealizationRegistry.getInstance(kylinConfig); - - List<String> requiredResources = Lists.newArrayList(); - List<String> optionalResources = Lists.newArrayList(); - - if (optionsHelper.hasOption(OPTION_PROJECT)) { - ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT)); - if (projectInstance == null) { - throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist"); - } - addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName())); - List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries(); - for (RealizationEntry realizationEntry : realizationEntries) { - retrieveResourcePath(getRealization(realizationEntry), includeSegments, includeJobs, requiredResources, optionalResources); - } - } else if (optionsHelper.hasOption(OPTION_CUBE)) { - String cubeName = optionsHelper.getOptionValue(OPTION_CUBE); - IRealization realization; - - if ((realization = cubeManager.getRealization(cubeName)) != null) { - retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources); - } else { - throw new IllegalArgumentException("No cube found with name of " + cubeName); - } - } else if (optionsHelper.hasOption(OPTION_HYBRID)) { - String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID); - IRealization realization; - - if ((realization = hybridManager.getRealization(hybridName)) != null) { - retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources); - } else { - throw new IllegalArgumentException("No hybrid found with name of" + hybridName); - } - } - - executeExtraction(requiredResources, optionalResources, dest); - } - - private void executeExtraction(List<String> requiredPaths, List<String> optionalPaths, String dest) { - logger.info("The resource paths going to be extracted:"); - for (String s : requiredPaths) { - logger.info(s + "(required)"); - } - for (String s : optionalPaths) { - logger.info(s + "(optional)"); - } - - if (dest == null) { - logger.info("Dest is not set, exit directly without extracting"); - } else { - try { - ResourceTool.copy(KylinConfig.getInstanceFromEnv(), KylinConfig.createInstanceFromUri(dest)); - } catch (IOException e) { - throw new RuntimeException("IOException", e); - } - } - } - - private IRealization getRealization(RealizationEntry realizationEntry) { - return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization()); - } - - private void retrieveResourcePath(IRealization realization, boolean includeSegments, boolean includeJobs, List<String> requiredResources, List<String> optionalResources) { - - logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType()); - - if (realization instanceof CubeInstance) { - CubeInstance cube = (CubeInstance) realization; - String descName = cube.getDescName(); - CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName); - String modelName = cubeDesc.getModelName(); - DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName); - - for (String tableName : modelDesc.getAllTables()) { - addRequired(requiredResources, TableDesc.concatResourcePath(tableName)); - addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName)); - } - - addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName())); - addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName())); - - if (includeSegments) { - addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName())); - for (CubeSegment segment : cube.getSegments()) { - for (String dictPat : segment.getDictionaryPaths()) { - addRequired(requiredResources, dictPat); - } - for (String snapshotPath : segment.getSnapshotPaths()) { - addRequired(requiredResources, snapshotPath); - } - addRequired(requiredResources, segment.getStatisticsResourcePath()); - - if (includeJobs) { - String lastJobId = segment.getLastBuildJobID(); - if (!StringUtils.isEmpty(lastJobId)) { - logger.warn("No job exist for segment {}", segment); - } else { - try { - ExecutablePO executablePO = executableDao.getJob(lastJobId); - addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId)); - addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId)); - for (ExecutablePO task : executablePO.getTasks()) { - addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid())); - addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid())); - } - } catch (PersistentException e) { - throw new RuntimeException("PersistentException", e); - } - } - } else { - logger.info("Job info will not be extracted"); - } - } - } else { - if (includeJobs) { - logger.warn("It's useless to set includeJobs to true when includeSegments is set to false"); - } - - throw new IllegalStateException("Does not support skip segments now"); - } - } else if (realization instanceof HybridInstance) { - HybridInstance hybridInstance = (HybridInstance) realization; - addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName())); - for (IRealization iRealization : hybridInstance.getRealizations()) { - retrieveResourcePath(iRealization, includeSegments, includeJobs, requiredResources, optionalResources); - } - } else if (realization instanceof IIInstance) { - throw new IllegalStateException("Does not support extract II instance or hybrid that contains II"); - } else { - throw new IllegalStateException("Unknown realization type: " + realization.getType()); - } - } - - private void addRequired(List<String> resourcePaths, String record) { - logger.info("adding required resource {}", record); - resourcePaths.add(record); - } - - private void addOptional(List<String> optionalPaths, String record) { - logger.info("adding optional resource {}", record); - optionalPaths.add(record); - } - - public static void main(String[] args) { - CubeMetaExtractor extractor = new CubeMetaExtractor(); - extractor.execute(args); - } -}