http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java index da0c082..0eb02b4 100644 --- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java +++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java @@ -40,13 +40,13 @@ public class HiveCmdBuilderTest { @After public void after() throws Exception { - System.clearProperty("kylin.hive.client"); - System.clearProperty("kylin.hive.beeline.params"); + System.clearProperty("kylin.source.hive.client"); + System.clearProperty("kylin.source.hive.beeline-params"); } @Test public void testHiveCLI() { - System.setProperty("kylin.hive.client", "cli"); + System.setProperty("kylin.source.hive.client", "cli"); HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement("USE default;"); @@ -59,8 +59,8 @@ public class HiveCmdBuilderTest { @Test public void testBeeline() throws IOException { String lineSeparator = java.security.AccessController.doPrivileged(new sun.security.action.GetPropertyAction("line.separator")); - System.setProperty("kylin.hive.client", "beeline"); - System.setProperty("kylin.hive.beeline.params", "-u jdbc_url"); + System.setProperty("kylin.source.hive.client", "beeline"); + System.setProperty("kylin.source.hive.beeline-params", "-u jdbc_url"); HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement("USE default;");
http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index 3033bfd..f4d54c5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -1,164 +1,164 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.hadoop; - -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.source.kafka.util.KafkaClient; -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; - -/** - * Run a Hadoop Job to process the stream data in kafka; - * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader - */ -public class KafkaFlatTableJob extends AbstractHadoopJob { - protected static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableJob.class); - - public static final String CONFIG_KAFKA_PARITION_MIN = "kafka.partition.min"; - public static final String CONFIG_KAFKA_PARITION_MAX = "kafka.partition.max"; - public static final String CONFIG_KAFKA_PARITION_START = "kafka.partition.start."; - public static final String CONFIG_KAFKA_PARITION_END = "kafka.partition.end."; - - public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers"; - public static final String CONFIG_KAFKA_TOPIC = "kafka.topic"; - public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout"; - public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size"; - public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group"; - public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; - public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_SEGMENT_ID); - parseOptions(options, args); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - String cubeName = getOptionValue(OPTION_CUBE_NAME); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - - String segmentId = getOptionValue(OPTION_SEGMENT_ID); - - // ---------------------------------------------------------------------------- - // add metadata to distributed cache - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubeInstance cube = cubeMgr.getCube(cubeName); - - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId); - logger.info("Starting: " + job.getJobName()); - - setJobClasspath(job, cube.getConfig()); - - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getRootFactTable()); - String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); - String topic = kafkaConfig.getTopic(); - - if (brokers == null || brokers.length() == 0 || topic == null) { - throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic); - } - - JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); - job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); - job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); - job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); - job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); - job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); - job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); - job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); - job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name - setupMapper(cube.getSegmentById(segmentId)); - job.setNumReduceTasks(0); - FileOutputFormat.setOutputPath(job, output); - FileOutputFormat.setCompressOutput(job, true); - org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output); - org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true); - job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); - - deletePath(job.getConfiguration(), output); - - attachKylinPropsAndMetadata(cube, job.getConfiguration()); - - return waitForCompletion(job); - - } catch (Exception e) { - logger.error("error in KafkaFlatTableJob", e); - printUsage(options); - throw e; - } finally { - if (job != null) - cleanupTempConfFile(job.getConfiguration()); - } - - } - - private void setupMapper(CubeSegment cubeSeg) throws IOException { - // set the segment's offset info to job conf - Map<Integer, Long> offsetStart = cubeSeg.getSourcePartitionOffsetStart(); - Map<Integer, Long> offsetEnd = cubeSeg.getSourcePartitionOffsetEnd(); - - Integer minPartition = Collections.min(offsetStart.keySet()); - Integer maxPartition = Collections.max(offsetStart.keySet()); - job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString()); - job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString()); - - for(Integer partition: offsetStart.keySet()) { - job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString()); - job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString()); - } - - job.setMapperClass(KafkaFlatTableMapper.class); - job.setInputFormatClass(KafkaInputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setNumReduceTasks(0); - } - - public static void main(String[] args) throws Exception { - KafkaFlatTableJob job = new KafkaFlatTableJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.kafka.hadoop; + +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.source.kafka.util.KafkaClient; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * Run a Hadoop Job to process the stream data in kafka; + * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader + */ +public class KafkaFlatTableJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableJob.class); + + public static final String CONFIG_KAFKA_PARITION_MIN = "kafka.partition.min"; + public static final String CONFIG_KAFKA_PARITION_MAX = "kafka.partition.max"; + public static final String CONFIG_KAFKA_PARITION_START = "kafka.partition.start."; + public static final String CONFIG_KAFKA_PARITION_END = "kafka.partition.end."; + + public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers"; + public static final String CONFIG_KAFKA_TOPIC = "kafka.topic"; + public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout"; + public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size"; + public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group"; + public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; + public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_SEGMENT_ID); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + String segmentId = getOptionValue(OPTION_SEGMENT_ID); + + // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId); + logger.info("Starting: " + job.getJobName()); + + setJobClasspath(job, cube.getConfig()); + + KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); + KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getRootFactTable()); + String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); + String topic = kafkaConfig.getTopic(); + + if (brokers == null || brokers.length() == 0 || topic == null) { + throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic); + } + + JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); + job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); + job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); + job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); + job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); + job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); + job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); + job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name + setupMapper(cube.getSegmentById(segmentId)); + job.setNumReduceTasks(0); + FileOutputFormat.setOutputPath(job, output); + FileOutputFormat.setCompressOutput(job, true); + org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output); + org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + + deletePath(job.getConfiguration(), output); + + attachKylinPropsAndMetadata(cube, job.getConfiguration()); + + return waitForCompletion(job); + + } catch (Exception e) { + logger.error("error in KafkaFlatTableJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + + } + + private void setupMapper(CubeSegment cubeSeg) throws IOException { + // set the segment's offset info to job conf + Map<Integer, Long> offsetStart = cubeSeg.getSourcePartitionOffsetStart(); + Map<Integer, Long> offsetEnd = cubeSeg.getSourcePartitionOffsetEnd(); + + Integer minPartition = Collections.min(offsetStart.keySet()); + Integer maxPartition = Collections.max(offsetStart.keySet()); + job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString()); + job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString()); + + for(Integer partition: offsetStart.keySet()) { + job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString()); + job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString()); + } + + job.setMapperClass(KafkaFlatTableMapper.class); + job.setInputFormatClass(KafkaInputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setNumReduceTasks(0); + } + + public static void main(String[] args) throws Exception { + KafkaFlatTableJob job = new KafkaFlatTableJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java index 6342c5c..8a20c65 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java @@ -1,97 +1,97 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; - -/** - * @author yangli9 - * - */ -public class RegionScannerAdapter implements RegionScanner { - - private ResultScanner scanner; - - public RegionScannerAdapter(ResultScanner scanner) { - this.scanner = scanner; - } - - @Override - public boolean next(List<Cell> results) throws IOException { - Result result = scanner.next(); - if (result == null) // EOF - return false; - - results.addAll(result.listCells()); - return true; - } - - @Override - public boolean next(List<Cell> result, int limit) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List<Cell> result) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return next(result); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override - public HRegionInfo getRegionInfo() { - return null; - } - - @Override - public long getMaxResultSize() { - return Long.MAX_VALUE; - } - - @Override - public boolean isFilterDone() throws IOException { - return false; - } - - @Override - public boolean reseek(byte[] row) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public long getMvccReadPoint() { - return Long.MAX_VALUE; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.cube.v1; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; + +/** + * @author yangli9 + * + */ +public class RegionScannerAdapter implements RegionScanner { + + private ResultScanner scanner; + + public RegionScannerAdapter(ResultScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean next(List<Cell> results) throws IOException { + Result result = scanner.next(); + if (result == null) // EOF + return false; + + results.addAll(result.listCells()); + return true; + } + + @Override + public boolean next(List<Cell> result, int limit) throws IOException { + return next(result); + } + + @Override + public boolean nextRaw(List<Cell> result) throws IOException { + return next(result); + } + + @Override + public boolean nextRaw(List<Cell> result, int limit) throws IOException { + return next(result); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + + @Override + public HRegionInfo getRegionInfo() { + return null; + } + + @Override + public long getMaxResultSize() { + return Long.MAX_VALUE; + } + + @Override + public boolean isFilterDone() throws IOException { + return false; + } + + @Override + public boolean reseek(byte[] row) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long getMvccReadPoint() { + return Long.MAX_VALUE; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java index 9d93e22..99058e7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java @@ -1,100 +1,100 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; - -import com.google.common.collect.Lists; - -/** - * @author yangli9 - * - */ -public class ResultScannerAdapter implements ResultScanner { - - private RegionScanner scanner; - - public ResultScannerAdapter(RegionScanner scanner) { - this.scanner = scanner; - } - - @Override - public Iterator<Result> iterator() { - return new Iterator<Result>() { - - Result next = null; - - @Override - public boolean hasNext() { - if (next == null) { - try { - next = ResultScannerAdapter.this.next(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return next != null; - } - - @Override - public Result next() { - Result r = next; - next = null; - return r; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public Result next() throws IOException { - List<Cell> cells = Lists.newArrayList(); - scanner.next(cells); - if (cells.isEmpty()) - return null; - else - return Result.create(cells); - } - - @Override - public Result[] next(int nbRows) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - try { - scanner.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.cube.v1; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; + +import com.google.common.collect.Lists; + +/** + * @author yangli9 + * + */ +public class ResultScannerAdapter implements ResultScanner { + + private RegionScanner scanner; + + public ResultScannerAdapter(RegionScanner scanner) { + this.scanner = scanner; + } + + @Override + public Iterator<Result> iterator() { + return new Iterator<Result>() { + + Result next = null; + + @Override + public boolean hasNext() { + if (next == null) { + try { + next = ResultScannerAdapter.this.next(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return next != null; + } + + @Override + public Result next() { + Result r = next; + next = null; + return r; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public Result next() throws IOException { + List<Cell> cells = Lists.newArrayList(); + scanner.next(cells); + if (cells.isEmpty()) + return null; + else + return Result.create(cells); + } + + @Override + public Result[] next(int nbRows) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + try { + scanner.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java index 06c6e2c..e2236d3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java @@ -1,71 +1,71 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.IEvaluatableTuple; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; - -/** - * A special kind of tuple that exposes column value (dictionary ID) directly on - * top of row key. - * - * @author yangli9 - */ -public class ObserverTuple implements IEvaluatableTuple { - - final CoprocessorRowType type; - - ImmutableBytesWritable rowkey; - String[] values; - - public ObserverTuple(CoprocessorRowType type) { - this.type = type; - this.rowkey = new ImmutableBytesWritable(); - this.values = new String[type.getColumnCount()]; - } - - public void setUnderlying(byte[] array, int offset, int length) { - rowkey.set(array, offset, length); - for (int i = 0; i < values.length; i++) { - values[i] = null; - } - } - - private String getValueAt(int i) { - int n = type.getColumnCount(); - if (i < 0 || i >= n) - return null; - - if (values[i] == null) { - values[i] = Dictionary.dictIdToString(rowkey.get(), rowkey.getOffset() + type.columnOffsets[i], type.columnSizes[i]); - } - - return values[i]; - } - - @Override - public Object getValue(TblColRef col) { - int i = type.getColIndexByTblColRef(col); - return getValueAt(i); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.tuple.IEvaluatableTuple; +import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; + +/** + * A special kind of tuple that exposes column value (dictionary ID) directly on + * top of row key. + * + * @author yangli9 + */ +public class ObserverTuple implements IEvaluatableTuple { + + final CoprocessorRowType type; + + ImmutableBytesWritable rowkey; + String[] values; + + public ObserverTuple(CoprocessorRowType type) { + this.type = type; + this.rowkey = new ImmutableBytesWritable(); + this.values = new String[type.getColumnCount()]; + } + + public void setUnderlying(byte[] array, int offset, int length) { + rowkey.set(array, offset, length); + for (int i = 0; i < values.length; i++) { + values[i] = null; + } + } + + private String getValueAt(int i) { + int n = type.getColumnCount(); + if (i < 0 || i >= n) + return null; + + if (values[i] == null) { + values[i] = Dictionary.dictIdToString(rowkey.get(), rowkey.getOffset() + type.columnOffsets[i], type.columnSizes[i]); + } + + return values[i]; + } + + @Override + public Object getValue(TblColRef col) { + int i = type.getColIndexByTblColRef(col); + return getValueAt(i); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 9e78aae..9593372 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -76,7 +76,7 @@ public class CubeHFileJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); setJobClasspath(job, cube.getConfig()); - // For separate HBase cluster, note the output is a qualified HDFS path if "kylin.hbase.cluster.fs" is configured, ref HBaseMRSteps.getHFilePath() + // For separate HBase cluster, note the output is a qualified HDFS path if "kylin.storage.hbase.cluster-fs" is configured, ref HBaseMRSteps.getHFilePath() HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration()); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java index 81b79cb..e219c5a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java @@ -1,83 +1,83 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.util; - -import java.io.IOException; - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.util; + +import java.io.IOException; + import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.storage.hbase.HBaseConnection; - -/** - * @author yangli9 - * - */ -public class PingHBaseCLI { - - public static void main(String[] args) throws IOException { - String hbaseTable = args[0]; - - System.out.println("Hello friend."); - - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - if (User.isHBaseSecurityEnabled(hconf)) { - try { - System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); - TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser()); - } catch (InterruptedException e) { - System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); - } - } - - Scan scan = new Scan(); - int limit = 20; - - HConnection conn = null; - HTableInterface table = null; - ResultScanner scanner = null; - try { - conn = HConnectionManager.createConnection(hconf); - table = conn.getTable(hbaseTable); - scanner = table.getScanner(scan); - int count = 0; - for (Result r : scanner) { - byte[] rowkey = r.getRow(); - System.out.println(Bytes.toStringBinary(rowkey)); - count++; - if (count == limit) - break; - } - } finally { +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.TokenUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.storage.hbase.HBaseConnection; + +/** + * @author yangli9 + * + */ +public class PingHBaseCLI { + + public static void main(String[] args) throws IOException { + String hbaseTable = args[0]; + + System.out.println("Hello friend."); + + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + if (User.isHBaseSecurityEnabled(hconf)) { + try { + System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); + TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser()); + } catch (InterruptedException e) { + System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); + } + } + + Scan scan = new Scan(); + int limit = 20; + + HConnection conn = null; + HTableInterface table = null; + ResultScanner scanner = null; + try { + conn = HConnectionManager.createConnection(hconf); + table = conn.getTable(hbaseTable); + scanner = table.getScanner(scan); + int count = 0; + for (Result r : scanner) { + byte[] rowkey = r.getRow(); + System.out.println(Bytes.toStringBinary(rowkey)); + count++; + if (count == limit) + break; + } + } finally { IOUtils.closeQuietly(scanner); IOUtils.closeQuietly(table); IOUtils.closeQuietly(conn); - } - - } -} + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/FuzzyValueCombinationTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/FuzzyValueCombinationTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/FuzzyValueCombinationTest.java index 14c14ba..afb0b33 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/FuzzyValueCombinationTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/FuzzyValueCombinationTest.java @@ -1,130 +1,130 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.common; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.translate.FuzzyValueCombination; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.Maps; - -/** - * @author yangli9 - * - */ -public class FuzzyValueCombinationTest extends LocalFileMetadataTestCase { - static final TableDesc table = new TableDesc(); - static TblColRef col1; - static TblColRef col2; - static TblColRef col3; - - static { - table.setName("table"); - table.setDatabase("default"); - } - - @BeforeClass - public static void setUp() throws Exception { - staticCreateTestMetadata(); - - col1 = col(1, table); - col2 = col(2, table); - col3 = col(3, table); - } - - @AfterClass - public static void after() throws Exception { - cleanAfterClass(); - } - - private static TblColRef col(int i, TableDesc t) { - return TblColRef.mockup(t, i, "Col" + i, "string"); - } - - @Test - public void testBasics() { - System.out.println("test basics ============================================================================"); - Map<TblColRef, Set<String>> values = Maps.newHashMap(); - values.put(col1, set("a", "b", "c")); - values.put(col2, set("x", "y", "z")); - List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); - for (Map<TblColRef, String> item : result) { - System.out.println(item); - } - assertEquals(9, result.size()); - } - - @Test - public void testSomeNull() { - System.out.println("test some null ============================================================================"); - Map<TblColRef, Set<String>> values = Maps.newHashMap(); - values.put(col1, set("a", "b", "c")); - values.put(col2, set()); - values.put(col3, set("x", "y", "z")); - List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); - for (Map<TblColRef, String> item : result) { - System.out.println(item); - } - assertEquals(9, result.size()); - } - - @Test - public void testAllNulls() { - System.out.println("test all nulls ============================================================================"); - Map<TblColRef, Set<String>> values = Maps.newHashMap(); - values.put(col1, set()); - values.put(col2, set()); - values.put(col3, set()); - List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); - for (Map<TblColRef, String> item : result) { - System.out.println(item); - } - assertEquals(0, result.size()); - } - - @Test - public void testCap() { - System.out.println("test cap ============================================================================"); - Map<TblColRef, Set<String>> values = Maps.newHashMap(); - values.put(col1, set("1", "2", "3", "4")); - values.put(col2, set("a", "b", "c")); - values.put(col3, set("x", "y", "z")); - List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); - for (Map<TblColRef, String> item : result) { - System.out.println(item); - } - assertEquals(0, result.size()); - } - - private Set<String> set(String... values) { - return new HashSet<String>(Arrays.asList(values)); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.common; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.translate.FuzzyValueCombination; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +/** + * @author yangli9 + * + */ +public class FuzzyValueCombinationTest extends LocalFileMetadataTestCase { + static final TableDesc table = new TableDesc(); + static TblColRef col1; + static TblColRef col2; + static TblColRef col3; + + static { + table.setName("table"); + table.setDatabase("default"); + } + + @BeforeClass + public static void setUp() throws Exception { + staticCreateTestMetadata(); + + col1 = col(1, table); + col2 = col(2, table); + col3 = col(3, table); + } + + @AfterClass + public static void after() throws Exception { + cleanAfterClass(); + } + + private static TblColRef col(int i, TableDesc t) { + return TblColRef.mockup(t, i, "Col" + i, "string"); + } + + @Test + public void testBasics() { + System.out.println("test basics ============================================================================"); + Map<TblColRef, Set<String>> values = Maps.newHashMap(); + values.put(col1, set("a", "b", "c")); + values.put(col2, set("x", "y", "z")); + List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); + for (Map<TblColRef, String> item : result) { + System.out.println(item); + } + assertEquals(9, result.size()); + } + + @Test + public void testSomeNull() { + System.out.println("test some null ============================================================================"); + Map<TblColRef, Set<String>> values = Maps.newHashMap(); + values.put(col1, set("a", "b", "c")); + values.put(col2, set()); + values.put(col3, set("x", "y", "z")); + List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); + for (Map<TblColRef, String> item : result) { + System.out.println(item); + } + assertEquals(9, result.size()); + } + + @Test + public void testAllNulls() { + System.out.println("test all nulls ============================================================================"); + Map<TblColRef, Set<String>> values = Maps.newHashMap(); + values.put(col1, set()); + values.put(col2, set()); + values.put(col3, set()); + List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); + for (Map<TblColRef, String> item : result) { + System.out.println(item); + } + assertEquals(0, result.size()); + } + + @Test + public void testCap() { + System.out.println("test cap ============================================================================"); + Map<TblColRef, Set<String>> values = Maps.newHashMap(); + values.put(col1, set("1", "2", "3", "4")); + values.put(col2, set("a", "b", "c")); + values.put(col3, set("x", "y", "z")); + List<Map<TblColRef, String>> result = FuzzyValueCombination.calculate(values, 10); + for (Map<TblColRef, String> item : result) { + System.out.println(item); + } + assertEquals(0, result.size()); + } + + private Set<String> set(String... values) { + return new HashSet<String>(Arrays.asList(values)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/RowProjectorTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/RowProjectorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/RowProjectorTest.java index 6be7808..2f0190e 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/RowProjectorTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/RowProjectorTest.java @@ -1,88 +1,88 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.common.coprocessor; - -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.kylin.common.util.Bytes; -import org.junit.Test; - -import com.google.common.collect.Lists; - -/** - * @author yangli9 - * - */ -public class RowProjectorTest { - - byte[] mask = new byte[] { (byte) 0xff, 0x00, 0x00, (byte) 0xff }; - CoprocessorProjector sample = new CoprocessorProjector(mask, true); - - @Test - public void testSerialize() { - - byte[] bytes = CoprocessorProjector.serialize(sample); - CoprocessorProjector copy = CoprocessorProjector.deserialize(bytes); - - assertTrue(Arrays.equals(sample.groupByMask, copy.groupByMask)); - } - - @Test - public void testProject() { - byte[] bytes1 = new byte[] { -1, -2, -3, -4 }; - byte[] bytes2 = new byte[] { 1, 2, 3, 4 }; - byte[] bytes3 = new byte[] { 1, 99, 100, 4 }; - byte[] bytes4 = new byte[] { 1, 1, 1, 5 }; - - AggrKey rowKey = sample.getAggrKey(newCellWithRowKey(bytes1)); - AggrKey rowKey2 = sample.getAggrKey(newCellWithRowKey(bytes2)); - assertTrue(rowKey == rowKey2); // no extra object creation - assertTrue(Bytes.equals(rowKey.get(), rowKey.offset(), rowKey.length(), bytes2, 0, bytes2.length)); - - rowKey2 = rowKey.copy(); // explicit object creation - assertTrue(rowKey != rowKey2); - - rowKey = sample.getAggrKey(newCellWithRowKey(bytes1)); - assertTrue(rowKey.hashCode() != rowKey2.hashCode()); - assertTrue(rowKey.equals(rowKey2) == false); - assertTrue(rowKey.compareTo(rowKey2) > 0); // unsigned compare - - rowKey = sample.getAggrKey(newCellWithRowKey(bytes3)); - assertTrue(rowKey.hashCode() == rowKey2.hashCode()); - assertTrue(rowKey.equals(rowKey2) == true); - assertTrue(rowKey.compareTo(rowKey2) == 0); - - rowKey = sample.getAggrKey(newCellWithRowKey(bytes4)); - assertTrue(rowKey.hashCode() != rowKey2.hashCode()); - assertTrue(rowKey.equals(rowKey2) == false); - assertTrue(rowKey.compareTo(rowKey2) > 0); - } - - private List<Cell> newCellWithRowKey(byte[] rowkey) { - ArrayList<Cell> list = Lists.newArrayList(); - list.add(new KeyValue(rowkey, null, null, null)); - return list; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.common.coprocessor; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.kylin.common.util.Bytes; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * @author yangli9 + * + */ +public class RowProjectorTest { + + byte[] mask = new byte[] { (byte) 0xff, 0x00, 0x00, (byte) 0xff }; + CoprocessorProjector sample = new CoprocessorProjector(mask, true); + + @Test + public void testSerialize() { + + byte[] bytes = CoprocessorProjector.serialize(sample); + CoprocessorProjector copy = CoprocessorProjector.deserialize(bytes); + + assertTrue(Arrays.equals(sample.groupByMask, copy.groupByMask)); + } + + @Test + public void testProject() { + byte[] bytes1 = new byte[] { -1, -2, -3, -4 }; + byte[] bytes2 = new byte[] { 1, 2, 3, 4 }; + byte[] bytes3 = new byte[] { 1, 99, 100, 4 }; + byte[] bytes4 = new byte[] { 1, 1, 1, 5 }; + + AggrKey rowKey = sample.getAggrKey(newCellWithRowKey(bytes1)); + AggrKey rowKey2 = sample.getAggrKey(newCellWithRowKey(bytes2)); + assertTrue(rowKey == rowKey2); // no extra object creation + assertTrue(Bytes.equals(rowKey.get(), rowKey.offset(), rowKey.length(), bytes2, 0, bytes2.length)); + + rowKey2 = rowKey.copy(); // explicit object creation + assertTrue(rowKey != rowKey2); + + rowKey = sample.getAggrKey(newCellWithRowKey(bytes1)); + assertTrue(rowKey.hashCode() != rowKey2.hashCode()); + assertTrue(rowKey.equals(rowKey2) == false); + assertTrue(rowKey.compareTo(rowKey2) > 0); // unsigned compare + + rowKey = sample.getAggrKey(newCellWithRowKey(bytes3)); + assertTrue(rowKey.hashCode() == rowKey2.hashCode()); + assertTrue(rowKey.equals(rowKey2) == true); + assertTrue(rowKey.compareTo(rowKey2) == 0); + + rowKey = sample.getAggrKey(newCellWithRowKey(bytes4)); + assertTrue(rowKey.hashCode() != rowKey2.hashCode()); + assertTrue(rowKey.equals(rowKey2) == false); + assertTrue(rowKey.compareTo(rowKey2) > 0); + } + + private List<Cell> newCellWithRowKey(byte[] rowkey) { + ArrayList<Cell> list = Lists.newArrayList(); + list.add(new KeyValue(rowkey, null, null, null)); + return list; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java index d0a0710..da34ff6 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java @@ -1,62 +1,62 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; - -import org.apache.kylin.common.util.Bytes; -import org.junit.Test; - -/** - * @author yangli9 - * - */ -public class RowAggregatorsTest { - - @Test - public void testSerialize() { - ObserverAggregators.HCol[] hcols = new ObserverAggregators.HCol[] { // - newHCol("f", "c1", new String[] { "SUM", "COUNT" }, new String[] { "decimal", "long" }), // - newHCol("f", "c2", new String[] { "SUM", "SUM" }, new String[] { "long", "long" }) }; - ObserverAggregators sample = new ObserverAggregators(hcols); - - byte[] bytes = ObserverAggregators.serialize(sample); - ObserverAggregators copy = ObserverAggregators.deserialize(bytes); - - assertTrue(sample.nHCols == copy.nHCols); - assertTrue(sample.nTotalMeasures == copy.nTotalMeasures); - assertEquals(sample.hcols[0], copy.hcols[0]); - assertEquals(sample.hcols[1], copy.hcols[1]); - } - - private static ObserverAggregators.HCol newHCol(String family, String qualifier, String[] funcNames, String[] dataTypes) { - return new ObserverAggregators.HCol(Bytes.toBytes(family), Bytes.toBytes(qualifier), funcNames, dataTypes); - } - - private static void assertEquals(ObserverAggregators.HCol a, ObserverAggregators.HCol b) { - assertTrue(a.nMeasures == b.nMeasures); - assertTrue(Arrays.equals(a.family, b.family)); - assertTrue(Arrays.equals(a.qualifier, b.qualifier)); - assertTrue(Arrays.equals(a.funcNames, b.funcNames)); - assertTrue(Arrays.equals(a.dataTypes, b.dataTypes)); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; + +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.kylin.common.util.Bytes; +import org.junit.Test; + +/** + * @author yangli9 + * + */ +public class RowAggregatorsTest { + + @Test + public void testSerialize() { + ObserverAggregators.HCol[] hcols = new ObserverAggregators.HCol[] { // + newHCol("f", "c1", new String[] { "SUM", "COUNT" }, new String[] { "decimal", "long" }), // + newHCol("f", "c2", new String[] { "SUM", "SUM" }, new String[] { "long", "long" }) }; + ObserverAggregators sample = new ObserverAggregators(hcols); + + byte[] bytes = ObserverAggregators.serialize(sample); + ObserverAggregators copy = ObserverAggregators.deserialize(bytes); + + assertTrue(sample.nHCols == copy.nHCols); + assertTrue(sample.nTotalMeasures == copy.nTotalMeasures); + assertEquals(sample.hcols[0], copy.hcols[0]); + assertEquals(sample.hcols[1], copy.hcols[1]); + } + + private static ObserverAggregators.HCol newHCol(String family, String qualifier, String[] funcNames, String[] dataTypes) { + return new ObserverAggregators.HCol(Bytes.toBytes(family), Bytes.toBytes(qualifier), funcNames, dataTypes); + } + + private static void assertEquals(ObserverAggregators.HCol a, ObserverAggregators.HCol b) { + assertTrue(a.nMeasures == b.nMeasures); + assertTrue(Arrays.equals(a.family, b.family)); + assertTrue(Arrays.equals(a.qualifier, b.qualifier)); + assertTrue(Arrays.equals(a.funcNames, b.funcNames)); + assertTrue(Arrays.equals(a.dataTypes, b.dataTypes)); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java index 96f47d3..9dd4c81 100644 --- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java @@ -71,7 +71,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { return m.group(1) + m.group(2) + ":19888"; } } - logger.info("kylin.job.yarn.app.rest.check.status.url" + " is not set read from hadoop configuration"); + logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set read from hadoop configuration"); Configuration conf = HadoopUtil.getCurrentConfiguration(); String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf); http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java index 3c0ce1b..b51a850 100644 --- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java +++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java @@ -333,7 +333,7 @@ public class StorageCleanupJob extends AbstractApplication { } cmdExec.execute(hiveCmdBuilder.build()); - //if kylin.hive.keep.flat.table, some intermediate table might be kept + //if kylin.source.hive.keep-flat-table, some intermediate table might be kept //delete external path for (String tableToDelete : allHiveTablesNeedToBeDeleted) { String uuid = tableToDelete.substring(tableToDelete.length() - uuidLength, tableToDelete.length()); @@ -347,7 +347,7 @@ public class StorageCleanupJob extends AbstractApplication { fs.delete(externalDataPath, true); logger.info("Hive table {}'s external path {} deleted", tableToDelete, path); } else { - logger.info("Hive table {}'s external path {} not exist. It's normal if kylin.hive.keep.flat.table set false (By default)", tableToDelete, path); + logger.info("Hive table {}'s external path {} not exist. It's normal if kylin.source.hive.keep-flat-table set false (By default)", tableToDelete, path); } } else { logger.warn("Hive table {}'s job ID not found, segmentId2JobId: {}", tableToDelete, segmentId2JobId.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/webapp/app/js/controllers/admin.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/controllers/admin.js b/webapp/app/js/controllers/admin.js index e77729a..0d36e0d 100644 --- a/webapp/app/js/controllers/admin.js +++ b/webapp/app/js/controllers/admin.js @@ -137,7 +137,7 @@ KylinApp.controller('AdminCtrl', function ($scope, AdminService, CacheService, T closeOnConfirm: true }, function (isConfirm) { if (isConfirm) { - AdminService.updateConfig({}, {key: 'kylin.query.cache.enabled', value: false}, function () { + AdminService.updateConfig({}, {key: 'kylin.query.cache-enabled', value: false}, function () { SweetAlert.swal('Success!', 'Cache disabled successfully!', 'success'); location.reload(); }, function (e) { @@ -166,7 +166,7 @@ KylinApp.controller('AdminCtrl', function ($scope, AdminService, CacheService, T closeOnConfirm: true }, function (isConfirm) { if (isConfirm) { - AdminService.updateConfig({}, {key: 'kylin.query.cache.enabled', value: true}, function () { + AdminService.updateConfig({}, {key: 'kylin.query.cache-enabled', value: true}, function () { SweetAlert.swal('Success!', 'Cache enabled successfully!', 'success'); location.reload(); }, function (e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/webapp/app/js/services/kylinProperties.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/services/kylinProperties.js b/webapp/app/js/services/kylinProperties.js index 7110b8c..1a12357 100644 --- a/webapp/app/js/services/kylinProperties.js +++ b/webapp/app/js/services/kylinProperties.js @@ -42,13 +42,13 @@ KylinApp.service('kylinConfig', function (AdminService, $log) { this.getTimeZone = function () { if (!this.timezone) { - this.timezone = this.getProperty("kylin.rest.timezone").trim(); + this.timezone = this.getProperty("kylin.web.timezone").trim(); } return this.timezone; } this.isCacheEnabled = function(){ - var status = this.getProperty("kylin.query.cache.enabled").trim(); + var status = this.getProperty("kylin.query.cache-enabled").trim(); if(status!=='false'){ return true; } @@ -57,7 +57,7 @@ KylinApp.service('kylinConfig', function (AdminService, $log) { //deprecated this.getDeployEnv = function () { - this.deployEnv = this.getProperty("deploy.env"); + this.deployEnv = this.getProperty("kylin.env"); if (!this.deployEnv) { return "DEV"; } @@ -73,7 +73,7 @@ KylinApp.service('kylinConfig', function (AdminService, $log) { } this.getStorageEng = function () { - this.StorageEng = this.getProperty("kylin.default.storage.engine").trim(); + this.StorageEng = this.getProperty("kylin.storage.default").trim(); if (!this.StorageEng) { return 2; } @@ -81,7 +81,7 @@ KylinApp.service('kylinConfig', function (AdminService, $log) { } this.getCubeEng = function () { - this.CubeEng = this.getProperty("kylin.default.cube.engine").trim(); + this.CubeEng = this.getProperty("kylin.engine.default").trim(); if (!this.CubeEng) { return 2; }