http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java new file mode 100644 index 0000000..fb2fbb5 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.metrics.systemcube; + +import java.io.BufferedInputStream; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metrics.lib.SinkTool; +import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +public class SCCreator extends AbstractApplication { + + private static final Logger logger = LoggerFactory.getLogger(SCCreator.class); + + private static final Option OPTION_OWNER = OptionBuilder.withArgName("owner").hasArg().isRequired(false) + .withDescription("Specify the owner who creates the metadata").create("owner"); + private static final Option OPTION_INPUT_CONFIG = OptionBuilder.withArgName("inputConfig").hasArg() + .isRequired(false).withDescription("Specify the input configuration file").create("inputConfig"); + private static final Option OPTION_OUTPUT = OptionBuilder.withArgName("output").hasArg().isRequired(true) + .withDescription("Specify the output where the generated metadata will be saved").create("output"); + + private static final String D_CUBE_INSTANCE = "cube/"; + private static final String D_CUBE_DESC = "cube_desc/"; + private static final String D_PROJECT = "project/"; + private static final String D_TABLE = "table/"; + private static final String D_MODEL_DESC = "model_desc/"; + // private static final String D_STREAMING = "streaming/"; + + private static final String F_HIVE_SQL = "create_hive_tables_for_system_cubes"; + + protected final Options options; + + private final KylinConfig config; + + private final ObjectMapper mapper; + + public SCCreator() { + mapper = new ObjectMapper(); + mapper.enableDefaultTyping(); + + config = KylinConfig.getInstanceFromEnv(); + + options = new Options(); + options.addOption(OPTION_OWNER); + options.addOption(OPTION_OUTPUT); + options.addOption(OPTION_INPUT_CONFIG); + } + + public static void main(String[] args) { + SCCreator cli = new SCCreator(); + cli.execute(args); + } + + protected Options getOptions() { + return options; + } + + protected void execute(OptionsHelper optionsHelper) throws Exception { + String owner = optionsHelper.getOptionValue(OPTION_OWNER); + String output = optionsHelper.getOptionValue(OPTION_OUTPUT); + String inputConfig = optionsHelper.getOptionValue(OPTION_INPUT_CONFIG); + if (Strings.isNullOrEmpty(inputConfig)) { + throw new RuntimeException("Input configuration file should be specified!!!"); + } + + execute(owner, output, inputConfig); + } + + public void execute(String owner, String output, String inputConfig) throws Exception { + if (Strings.isNullOrEmpty(owner)) { + owner = "ADMIN"; + } + if (!output.endsWith("/")) { + output += "/"; + } + + Set<SinkTool> sourceToolSet = mapper + .readValue(new BufferedInputStream(new FileInputStream(new File(inputConfig))), HashSet.class); + run(owner, output, sourceToolSet); + } + + private void run(String owner, String output, Collection<SinkTool> sinkToolSet) throws IOException { + List<TableDesc> kylinTables = Lists.newArrayList(); + List<DataModelDesc> kylinModels = Lists.newArrayList(); + List<CubeDesc> kylinCubeDescs = Lists.newArrayList(); + List<CubeInstance> kylinCubeInstances = Lists.newArrayList(); + // List<StreamingConfig> kylinStreamings = Lists.newArrayList(); + + boolean ifHive = false; + for (SinkTool sourceTool : sinkToolSet) { + if (sourceTool instanceof HiveSinkTool) { + ifHive = true; + } else { + logger.warn("current version only support hive sink!!!"); + continue; + } + kylinTables.addAll(generateKylinTableForSystemCube(sourceTool)); + kylinModels.addAll(generateKylinModelForSystemCube(owner, sourceTool)); + kylinCubeDescs.addAll(generateKylinCubeDescForSystemCube(sourceTool)); + kylinCubeInstances.addAll(generateKylinCubeInstanceForSystemCube(owner, sourceTool)); + // if (sourceTool instanceof StreamSinkTool) { + // kylinStreamings.addAll(generateKylinStreamingForSystemCube((StreamSinkTool) sourceTool)); + // } + } + + if (ifHive) { + generateHiveTableSQLFileForSystemCube(output); + } + + ProjectInstance projectInstance = ProjectCreator.generateKylinProjectInstance(owner, kylinTables, kylinModels, + kylinCubeDescs); + generateKylinProjectFileForSystemCube(output, projectInstance); + for (TableDesc tableDesc : kylinTables) { + generateKylinTableFileForSystemCube(output, tableDesc); + } + for (DataModelDesc dataModelDesc : kylinModels) { + generateKylinModelFileForSystemCube(output, dataModelDesc); + } + for (CubeDesc cubeDesc : kylinCubeDescs) { + generateKylinCubeDescFileForSystemCube(output, cubeDesc); + } + for (CubeInstance cubeInstance : kylinCubeInstances) { + generateKylinCubeInstanceFileForSystemCube(output, cubeInstance); + } + // for (StreamingConfig streaming : kylinStreamings) { + // generateKylinStreamingFileForSystemCube(output, streaming); + // } + } + + private List<TableDesc> generateKylinTableForSystemCube(SinkTool sinkTool) { + List<TableDesc> result = Lists.newLinkedList(); + result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, sinkTool)); + result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, sinkTool)); + result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, sinkTool)); + result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, sinkTool)); + result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, sinkTool)); + + return result; + } + + private List<DataModelDesc> generateKylinModelForSystemCube(String owner, SinkTool sinkTool) { + List<DataModelDesc> result = Lists.newLinkedList(); + result.add(ModelCreator.generateKylinModelForMetricsQuery(owner, config, sinkTool)); + result.add(ModelCreator.generateKylinModelForMetricsQueryCube(owner, config, sinkTool)); + result.add(ModelCreator.generateKylinModelForMetricsQueryRPC(owner, config, sinkTool)); + result.add(ModelCreator.generateKylinModelForMetricsJob(owner, config, sinkTool)); + result.add(ModelCreator.generateKylinModelForMetricsJobException(owner, config, sinkTool)); + + return result; + } + + private List<CubeDesc> generateKylinCubeDescForSystemCube(SinkTool sinkTool) { + List<CubeDesc> result = Lists.newLinkedList(); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, sinkTool)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, sinkTool)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, sinkTool)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, sinkTool)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, sinkTool)); + + return result; + } + + private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String owner, SinkTool sinkTool) { + List<CubeInstance> result = Lists.newLinkedList(); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, config, sinkTool)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner, config, sinkTool)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner, config, sinkTool)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, config, sinkTool)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner, config, sinkTool)); + + return result; + } + + // private List<StreamingConfig> generateKylinStreamingForSystemCube(StreamSinkTool sinkTool) { + // List<StreamingConfig> result = Lists.newLinkedList(); + // result.add(StreamingCreator.generateKylinStreamingForMetricsQuery(config, sinkTool)); + // result.add(StreamingCreator.generateKylinStreamingForMetricsQueryCube(config, sinkTool)); + // result.add(StreamingCreator.generateKylinStreamingForMetricsQueryRPC(config, sinkTool)); + // result.add(StreamingCreator.generateKylinStreamingForMetricsJob(config, sinkTool)); + // result.add(StreamingCreator.generateKylinStreamingForMetricsJobException(config, sinkTool)); + // + // return result; + // } + + private void generateHiveTableSQLFileForSystemCube(String output) throws IOException { + String contents = HiveTableCreator.generateAllSQL(config); + saveToFile(output + F_HIVE_SQL + ".sql", contents); + } + + private void generateKylinTableFileForSystemCube(String output, TableDesc kylinTable) throws IOException { + saveSystemCubeMetadataToFile(output + D_TABLE + kylinTable.getIdentity() + ".json", kylinTable, + MetadataManager.TABLE_SERIALIZER); + } + + private void generateKylinModelFileForSystemCube(String output, DataModelDesc modelDesc) throws IOException { + saveSystemCubeMetadataToFile(output + D_MODEL_DESC + modelDesc.getName() + ".json", modelDesc, + MetadataManager.MODELDESC_SERIALIZER); + } + + private void generateKylinCubeInstanceFileForSystemCube(String output, CubeInstance cubeInstance) + throws IOException { + saveSystemCubeMetadataToFile(output + D_CUBE_INSTANCE + cubeInstance.getName() + ".json", cubeInstance, + CubeManager.CUBE_SERIALIZER); + } + + private void generateKylinCubeDescFileForSystemCube(String output, CubeDesc cubeDesc) throws IOException { + saveSystemCubeMetadataToFile(output + D_CUBE_DESC + cubeDesc.getName() + ".json", cubeDesc, + CubeDescManager.CUBE_DESC_SERIALIZER); + } + + private void generateKylinProjectFileForSystemCube(String output, ProjectInstance projectInstance) + throws IOException { + saveSystemCubeMetadataToFile(output + D_PROJECT + projectInstance.getName() + ".json", projectInstance, + CubeDescManager.CUBE_DESC_SERIALIZER); + } + + // private void generateKylinStreamingFileForSystemCube(String output, StreamingConfig streamingConfig) + // throws IOException { + // saveSystemCubeMetadataToFile(output + D_STREAMING + streamingConfig.getName() + ".json", streamingConfig, + // StreamingConfig.SERIALIZER); + // } + + private <T extends RootPersistentEntity> void saveSystemCubeMetadataToFile(String fileName, T metadata, + Serializer serializer) throws IOException { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(buf); + serializer.serialize(metadata, dout); + dout.close(); + buf.close(); + + saveToFile(fileName, buf.toString()); + } + + private void saveToFile(String fileName, String contents) throws IOException { + File parentDir = new File(fileName).getParentFile(); + if (!parentDir.exists()) { + parentDir.mkdirs(); + } + + BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(fileName)); + bufferedWriter.append(contents); + bufferedWriter.close(); + } +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/StreamingCreator.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/StreamingCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/StreamingCreator.java new file mode 100644 index 0000000..d5dfc94 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/StreamingCreator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.metrics.systemcube; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.util.Map; +import java.util.UUID; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.tool.metrics.systemcube.util.KafkaSinkTool; +import org.apache.kylin.tool.metrics.systemcube.util.StreamSinkTool; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; + +public class StreamingCreator { + + public static void main(String[] args) throws Exception { + // KylinConfig.setSandboxEnvIfPossible(); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + + StreamingConfig streamingConfig = generateKylinStreamingForMetricsQuery(config, new KafkaSinkTool()); + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(buf); + StreamingConfig.SERIALIZER.serialize(streamingConfig, dout); + dout.close(); + buf.close(); + System.out.println(buf.toString()); + } + + public static StreamingConfig generateKylinStreamingForMetricsQuery(KylinConfig config, StreamSinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()); + return generateKylinStreaming(tableName, sinkTool.getTopicName(tableName), sinkTool.getStreamingType(), + sinkTool.getStreamingProperties()); + } + + public static StreamingConfig generateKylinStreamingForMetricsQueryCube(KylinConfig config, + StreamSinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()); + return generateKylinStreaming(tableName, sinkTool.getTopicName(tableName), sinkTool.getStreamingType(), + sinkTool.getStreamingProperties()); + } + + public static StreamingConfig generateKylinStreamingForMetricsQueryRPC(KylinConfig config, + StreamSinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()); + return generateKylinStreaming(tableName, sinkTool.getTopicName(tableName), sinkTool.getStreamingType(), + sinkTool.getStreamingProperties()); + } + + public static StreamingConfig generateKylinStreamingForMetricsJob(KylinConfig config, StreamSinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob()); + return generateKylinStreaming(tableName, sinkTool.getTopicName(tableName), sinkTool.getStreamingType(), + sinkTool.getStreamingProperties()); + } + + public static StreamingConfig generateKylinStreamingForMetricsJobException(KylinConfig config, + StreamSinkTool sinkTool) { + String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException()); + return generateKylinStreaming(tableName, sinkTool.getTopicName(tableName), sinkTool.getStreamingType(), + sinkTool.getStreamingProperties()); + } + + public static StreamingConfig generateKylinStreaming(String tableName, String topicName, String type, + Map<String, String> properties) { + StreamingConfig streamingConfig = new StreamingConfig(); + streamingConfig.setUuid(UUID.randomUUID().toString()); + streamingConfig.setName(tableName); + streamingConfig.setLastModified(0L); + streamingConfig.setType(type); + + Map<String, String> streamingProperties = Maps.newHashMap(properties); + streamingProperties.put("topic", topicName); + streamingConfig.setProperties(streamingProperties); + + return streamingConfig; + } + + public static class StreamingConfig extends RootPersistentEntity { + + public static final String STREAMING_TYPE_KAFKA = "kafka"; + public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>( + StreamingConfig.class); + @JsonProperty("name") + private String name; + + @JsonProperty("type") + private String type = STREAMING_TYPE_KAFKA; + + @JsonProperty("properties") + private Map<String, String> properties = Maps.newLinkedHashMap(); + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java new file mode 100644 index 0000000..ef46163 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.metrics.systemcube.util; + +import java.util.Map; + +import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metrics.lib.SinkTool; +import org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; + +@SuppressWarnings("serial") +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class HiveSinkTool implements SinkTool { + + @JsonProperty("storageType") + protected int storageType = IStorageAware.ID_SHARDED_HBASE; + + @JsonProperty("cubeDescOverrideProperties") + protected Map<String, String> cubeDescOverrideProperties = Maps.newHashMap(); + + public int getStorageType() { + return storageType; + } + + public int getSourceType() { + return ISourceAware.ID_HIVE; + } + + public String getTableNameForMetrics(String subject) { + return HiveReservoirReporter.getTableFromSubject(subject); + } + + public Map<String, String> getCubeDescOverrideProperties() { + return cubeDescOverrideProperties; + } + + public void setCubeDescOverrideProperties(Map<String, String> cubeDescOverrideProperties) { + this.cubeDescOverrideProperties = cubeDescOverrideProperties; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/KafkaSinkTool.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/KafkaSinkTool.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/KafkaSinkTool.java new file mode 100644 index 0000000..6caa5ec --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/KafkaSinkTool.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.metrics.systemcube.util; + +import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter; + +public class KafkaSinkTool extends StreamSinkTool { + + public String getTableNameForMetrics(String subject) { + return KafkaReservoirReporter.getTableFromSubject(subject); + } + + public String getStreamingType() { + return "kafka"; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/StreamSinkTool.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/StreamSinkTool.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/StreamSinkTool.java new file mode 100644 index 0000000..2f6f034 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/StreamSinkTool.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.metrics.systemcube.util; + +import java.util.Map; + +import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metrics.lib.impl.RecordEvent; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; + +public abstract class StreamSinkTool { + + @JsonProperty("cubeDescOverrideProperties") + protected Map<String, String> cubeDescOverrideProperties = Maps.newHashMap(); + + @JsonProperty("streamingProperties") + protected Map<String, String> streamingProperties = Maps.newHashMap(); + + public abstract String getTableNameForMetrics(String subject); + + public int getStorageType() { + return IStorageAware.ID_STREAM; + } + + public abstract String getStreamingType(); + + public String getTopicName(String tableName) { + return tableName.replace('.', '_'); + } + + public int getSourceType() { + return ISourceAware.ID_STREAMING; + } + + public Map<String, String> getCubeDescOverrideProperties() { + return cubeDescOverrideProperties; + } + + public void setCubeDescOverrideProperties(Map<String, String> cubeDescOverrideProperties) { + this.cubeDescOverrideProperties = cubeDescOverrideProperties; + } + + public void setStreamingProperties(Map<String, String> streamingProperties) { + this.streamingProperties = streamingProperties; + } + + public Map<String, String> getStreamingProperties() { + return streamingProperties; + } + + public void init() { + streamingProperties.put("parserProperties", + "tsColName=" + RecordEvent.RecordReserveKeyEnum.TIME.toString() + ";"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/main/resources/SCSinkTools.json ---------------------------------------------------------------------- diff --git a/tool/src/main/resources/SCSinkTools.json b/tool/src/main/resources/SCSinkTools.json new file mode 100644 index 0000000..15a715f --- /dev/null +++ b/tool/src/main/resources/SCSinkTools.json @@ -0,0 +1,14 @@ +[ + [ + "org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool", + { + "storageType": 2, + "cubeDescOverrideProperties": [ + "java.util.HashMap", + { + "kylin.cube.algorithm": "INMEM" + } + ] + } + ] +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java ---------------------------------------------------------------------- diff --git a/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java b/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java new file mode 100644 index 0000000..6718723 --- /dev/null +++ b/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.metrics.systemcube; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class SCCreatorTest extends LocalFileMetadataTestCase { + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + MetadataManager.clearCache(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testExecute() throws Exception { + String outputPath = "../examples/system"; + String inputPath = "src/main/resources/SCSinkTools.json"; + + SCCreator cli = new SCCreator(); + cli.execute("ADMIN", outputPath, inputPath); + } + + @Test + public void testWriteSinkToolsJson() throws Exception { + Map<String, String> cubeDescOverrideProperties = Maps.newHashMap(); + cubeDescOverrideProperties.put("kylin.cube.algorithm", "INMEM"); + + HiveSinkTool hiveSinkTool = new HiveSinkTool(); + hiveSinkTool.setCubeDescOverrideProperties(cubeDescOverrideProperties); + + try (BufferedOutputStream os = new BufferedOutputStream( + new FileOutputStream("src/test/resources/SCSinkTools.json"))) { + ObjectMapper mapper = new ObjectMapper(); + mapper.enableDefaultTyping(); + mapper.writeValue(os, Sets.newHashSet(hiveSinkTool)); + } + } + + @Test + public void testReadSinkToolsJson() throws Exception { + try (BufferedInputStream is = new BufferedInputStream( + new FileInputStream("src/main/resources/SCSinkTools.json"))) { + ObjectMapper mapper = new ObjectMapper(); + mapper.enableDefaultTyping(); + Set<HiveSinkTool> sinkToolSet = mapper.readValue(is, HashSet.class); + for (HiveSinkTool entry : sinkToolSet) { + System.out.println(entry.getCubeDescOverrideProperties()); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/74e167f0/tool/src/test/resources/SCSinkTools.json ---------------------------------------------------------------------- diff --git a/tool/src/test/resources/SCSinkTools.json b/tool/src/test/resources/SCSinkTools.json new file mode 100644 index 0000000..15a715f --- /dev/null +++ b/tool/src/test/resources/SCSinkTools.json @@ -0,0 +1,14 @@ +[ + [ + "org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool", + { + "storageType": 2, + "cubeDescOverrideProperties": [ + "java.util.HashMap", + { + "kylin.cube.algorithm": "INMEM" + } + ] + } + ] +] \ No newline at end of file