http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 9c29f38..fc52701 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -95,8 +95,7 @@ public class CreateHTableJob extends AbstractHadoopJob { byte[][] splitKeys; if (statsEnabled) { final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap(); - splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, - partitionFilePath.getParent()); + splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent()); } else { splitKeys = getRegionSplits(conf, partitionFilePath); } @@ -150,9 +149,7 @@ public class CreateHTableJob extends AbstractHadoopJob { return result; } - public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, - final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) - throws IOException { + public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) throws IOException { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); float cut = cubeDesc.getConfig().getKylinHBaseRegionCut(); @@ -185,8 +182,7 @@ public class CreateHTableJob extends AbstractHadoopJob { } if (nRegion != original) { - logger.info( - "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding"); + logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding"); } } @@ -217,13 +213,10 @@ public class CreateHTableJob extends AbstractHadoopJob { } if (shardNum > nRegion) { - logger.info( - String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", - cuboidId, estimatedSize, shardNum, nRegion)); + logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shardNum, nRegion)); shardNum = nRegion; } else { - logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, - estimatedSize, shardNum)); + logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, estimatedSize, shardNum)); } cuboidShards.put(cuboidId, (short) shardNum); @@ -236,8 +229,7 @@ public class CreateHTableJob extends AbstractHadoopJob { } for (int i = 0; i < nRegion; ++i) { - logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, - regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM)); + logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM)); } CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion); @@ -255,8 +247,7 @@ public class CreateHTableJob extends AbstractHadoopJob { if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) { // if the size already bigger than threshold, or it will exceed by 20%, cut for next region regionSplit.add(cuboidId); - logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId - + " (" + cuboidCount + ") cuboids"); + logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids"); size = 0; cuboidCount = 0; regionIndex++; @@ -274,8 +265,7 @@ public class CreateHTableJob extends AbstractHadoopJob { } } - protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, - final Path outputFolder, final KylinConfig kylinConfig) throws IOException { + protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, final Path outputFolder, final KylinConfig kylinConfig) throws IOException { if (outputFolder == null) { logger.warn("outputFolder for hfile split file is null, skip inner region split"); @@ -334,8 +324,7 @@ public class CreateHTableJob extends AbstractHadoopJob { logger.info(String.format("Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize)); byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN]; BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN); - System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, - RowConstants.ROWKEY_CUBOIDID_LEN); + System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN); splits.add(split); accumulatedSize = 0; j++; @@ -345,10 +334,7 @@ public class CreateHTableJob extends AbstractHadoopJob { } - SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, - SequenceFile.Writer.file(hfilePartitionFile), - SequenceFile.Writer.keyClass(ImmutableBytesWritable.class), - SequenceFile.Writer.valueClass(NullWritable.class)); + SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(ImmutableBytesWritable.class), SequenceFile.Writer.valueClass(NullWritable.class)); for (int i = 0; i < splits.size(); i++) { hfilePartitionWriter.append(new ImmutableBytesWritable(splits.get(i)), NullWritable.get());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java index 7a5f195..feb4842 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java @@ -59,8 +59,7 @@ public class CubeHTableUtil { CubeDesc cubeDesc = cubeInstance.getDescriptor(); KylinConfig kylinConfig = cubeDesc.getConfig(); - HTableDescriptor tableDesc = new HTableDescriptor( - TableName.valueOf(cubeSegment.getStorageLocationIdentifier())); + HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(cubeSegment.getStorageLocationIdentifier())); tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); @@ -104,8 +103,7 @@ public class CubeHTableUtil { DeployCoprocessorCLI.deployCoprocessor(tableDesc); admin.createTable(tableDesc, splitKeys); - Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), - "table " + tableName + " created, but is not available due to some reasons"); + Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons"); logger.info("create hbase table " + tableName + " done."); } finally { IOUtils.closeQuietly(admin); @@ -146,8 +144,7 @@ public class CubeHTableUtil { logger.info("creating hbase table " + tableName); admin.createTable(tableDesc, null); - Preconditions.checkArgument(admin.isTableAvailable(tableName), - "table " + tableName + " created, but is not available due to some reasons"); + Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons"); logger.info("create hbase table " + tableName + " done."); } finally { IOUtils.closeQuietly(admin); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java index 42fb283..df3cf08 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java @@ -87,8 +87,7 @@ public class DeprecatedGCStep extends AbstractExecutable { private void dropHiveTable(ExecutableContext context) throws IOException { final String hiveTable = this.getOldHiveTable(); if (StringUtils.isNotEmpty(hiveTable)) { - final String dropSQL = "USE " + context.getConfig().getHiveDatabaseForIntermediateTable() + ";" - + " DROP TABLE IF EXISTS " + hiveTable + ";"; + final String dropSQL = "USE " + context.getConfig().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";"; final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement(dropSQL); context.getConfig().getCliCommandExecutor().execute(hiveCmdBuilder.build()); @@ -196,4 +195,4 @@ public class DeprecatedGCStep extends AbstractExecutable { return getParam(OLD_HIVE_TABLE); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index bcea725..6587d4e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -101,8 +101,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { final KeyValue keyValue = keyValueCreators.get(i).create(key, 0, key.length, values); final Put put = new Put(copy(key, 0, key.length)); byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength()); - byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), - keyValue.getQualifierLength()); + byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength()); byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength()); put.add(family, qualifier, value); puts.add(put); @@ -119,8 +118,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { if (hTable != null) { hTable.put(puts); } - logger.info( - "commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); + logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); puts.clear(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index fe53290..31cb189 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -97,8 +97,7 @@ public class HBaseMROutput2Transition implements IMROutput2 { int reducerNum = 1; Class mapperClass = job.getMapperClass(); if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == NDCuboidMapper.class) { - reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, - AbstractHadoopJob.getTotalMapInputMB(job), level); + reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), level); } else if (mapperClass == InMemCuboidMapper.class) { reducerNum = ReducerNumSizing.getInmemCubingReduceTaskNum(segment); } @@ -121,10 +120,8 @@ public class HBaseMROutput2Transition implements IMROutput2 { } @Override - public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments, - DefaultChainedExecutable jobFlow) { - jobFlow.addTask( - steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class)); + public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class)); jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } @@ -141,10 +138,9 @@ public class HBaseMROutput2Transition implements IMROutput2 { }; } - public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat { + public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat{ - private static final Pattern JOB_NAME_PATTERN = Pattern - .compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); + private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); @Override public void configureJobInput(Job job, String input) throws Exception { @@ -153,8 +149,7 @@ public class HBaseMROutput2Transition implements IMROutput2 { @Override public void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception { - int reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, - AbstractHadoopJob.getTotalMapInputMB(job), -1); + int reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), -1); job.setNumReduceTasks(reducerNum); Path outputPath = new Path(output); @@ -190,4 +185,4 @@ public class HBaseMROutput2Transition implements IMROutput2 { throw new IllegalStateException("No merging segment's last build job ID equals " + jobID); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index a121c9c..6f69e8c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -67,8 +67,7 @@ public class HBaseMRSteps extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getRowkeyDistributionOutputPath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step"); rowkeyDistributionStep.setMapReduceParams(cmd.toString()); rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class); @@ -89,8 +88,7 @@ public class HBaseMRSteps extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); - appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, - getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); + appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats)); createHtableStep.setJobParams(cmd.toString()); @@ -99,8 +97,7 @@ public class HBaseMRSteps extends JobBuilderSupport { return createHtableStep; } - public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, - String jobID, Class<? extends AbstractHadoopJob> clazz) { + public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) { final List<String> mergingCuboidPaths = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { @@ -118,8 +115,7 @@ public class HBaseMRSteps extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); mergeCuboidDataStep.setMapReduceParams(cmd.toString()); mergeCuboidDataStep.setMapReduceJobClass(clazz); @@ -136,13 +132,11 @@ public class HBaseMRSteps extends JobBuilderSupport { appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, - getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); + appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step"); createHFilesStep.setMapReduceParams(cmd.toString()); createHFilesStep.setMapReduceJobClass(CubeHFileJob.class); @@ -174,10 +168,8 @@ public class HBaseMRSteps extends JobBuilderSupport { } public List<String> getMergingHTables() { - final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()) - .getMergingSegments((CubeSegment) seg); - Preconditions.checkState(mergingSegments.size() > 1, - "there should be more than 2 segments to merge, target segment " + seg); + final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg); + Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg); final List<String> mergingHTables = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { mergingHTables.add(merging.getStorageLocationIdentifier()); @@ -186,10 +178,8 @@ public class HBaseMRSteps extends JobBuilderSupport { } public List<String> getMergingHDFSPaths() { - final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()) - .getMergingSegments((CubeSegment) seg); - Preconditions.checkState(mergingSegments.size() > 1, - "there should be more than 2 segments to merge, target segment " + seg); + final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg); + Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg); final List<String> mergingHDFSPaths = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID())); @@ -198,13 +188,11 @@ public class HBaseMRSteps extends JobBuilderSupport { } public String getHFilePath(String jobId) { - return HBaseConnection.makeQualifiedPathInHBaseCluster( - getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/"); + return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/"); } public String getRowkeyDistributionOutputPath(String jobId) { - return HBaseConnection.makeQualifiedPathInHBaseCluster( - getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats"); + return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats"); } public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java index 1bdf58a..2876e3e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java @@ -99,8 +99,7 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob { float hfileSizeGB = kylinConfig.getHBaseHFileSizeGB(); float regionSplitSize = kylinConfig.getKylinHBaseRegionCut(); - int compactionThreshold = Integer.valueOf( - HBaseConnection.getCurrentHBaseConfiguration().get("hbase.hstore.compactionThreshold", "3")); + int compactionThreshold = Integer.valueOf(HBaseConnection.getCurrentHBaseConfiguration().get("hbase.hstore.compactionThreshold", "3")); if (hfileSizeGB > 0 && hfileSizeGB * compactionThreshold < regionSplitSize) { hfileSizeGB = regionSplitSize / compactionThreshold; logger.info("Adjust hfile size' to " + hfileSizeGB); @@ -113,8 +112,7 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob { job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MIN, String.valueOf(minRegionCount)); // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, - NullWritable.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class); return waitForCompletion(job); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java index 8f7096a..63433dd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java @@ -77,16 +77,14 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MAX)); } - logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count=" - + minRegionCount + ", hfile size=" + hfileSizeGB); + logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count=" + minRegionCount + ", hfile size=" + hfileSizeGB); // add empty key at position 0 gbPoints.add(new Text()); } @Override - public void doReduce(Text key, Iterable<LongWritable> values, Context context) - throws IOException, InterruptedException { + public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { for (LongWritable v : values) { bytesRead += v.get(); } @@ -117,9 +115,7 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable System.out.println(hfilePerRegion + " hfile per region"); Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile"); - SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer( - hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), - hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class); + SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class); int hfileCountInOneRegion = 0; for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) { hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java index 5a03985..5e6ad34 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java @@ -68,9 +68,7 @@ public class SequenceFileCuboidWriter extends KVGTRecordWriter { Path cuboidFile = new Path(cuboidPath, "data.seq"); logger.debug("Cuboid is written to " + cuboidFile); - writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), - SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(Text.class)); + writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 8d74eb3..2154ed1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -97,8 +97,7 @@ public class CubeMigrationCLI { private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p"; public static void main(String[] args) throws IOException, InterruptedException { - logger.warn( - "org.apache.kylin.storage.hbase.util.CubeMigrationCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead"); + logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead"); if (args.length != 8) { usage(); @@ -109,22 +108,12 @@ public class CubeMigrationCLI { } private static void usage() { - System.out.println( - "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); - System.out.println(" srcKylinConfigUri: The KylinConfig of the cubeâs source \n" - + "dstKylinConfigUri: The KylinConfig of the cubeâs new home \n" - + "cubeName: the name of cube to be migrated. \n" - + "projectName: The target project in the target environment.(Make sure it exist) \n" - + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" - + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" - + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" - + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"); + System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); + System.out.println(" srcKylinConfigUri: The KylinConfig of the cubeâs source \n" + "dstKylinConfigUri: The KylinConfig of the cubeâs new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"); } - public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, - String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) - throws IOException, InterruptedException { + public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { srcConfig = srcCfg; srcStore = ResourceStore.getStore(srcConfig); @@ -174,16 +163,12 @@ public class CubeMigrationCLI { } } - public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute) - throws IOException, InterruptedException { + public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { - moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, - projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); + moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); } - public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) - throws IOException { + public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); checkCLI.execute(cubeName); } @@ -213,14 +198,12 @@ public class CubeMigrationCLI { private static void changeHtableHost(CubeInstance cube) { for (CubeSegment segment : cube.getSegments()) { - operations - .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); + operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); } } private static void copyACL(CubeInstance cube, String projectName) { - operations.add(new Opt(OptType.COPY_ACL, - new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName })); + operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName })); } private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException { @@ -230,8 +213,7 @@ public class CubeMigrationCLI { listCubeRelatedResources(cube, metaItems, dictAndSnapshot); if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true")) - throw new IllegalStateException("The cube named " + cube.getName() - + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); + throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); for (String item : metaItems) { operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item })); @@ -242,8 +224,7 @@ public class CubeMigrationCLI { } } - private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) - throws IOException { + private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException { String projectResPath = ProjectInstance.concatResourcePath(projectName); if (!dstStore.exists(projectResPath)) throw new IllegalStateException("The target project " + projectName + "does not exist"); @@ -255,8 +236,7 @@ public class CubeMigrationCLI { operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName })); } - private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, - Set<String> dictAndSnapshot) throws IOException { + private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot) throws IOException { CubeDesc cubeDesc = cube.getDescriptor(); metaResource.add(cube.getResourcePath()); @@ -463,10 +443,8 @@ public class CubeMigrationCLI { Table srcAclHtable = null; Table destAclHtable = null; try { - srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()) - .getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()) - .getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); // cube acl Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); @@ -477,10 +455,8 @@ public class CubeMigrationCLI { byte[] value = CellUtil.cloneValue(cell); // use the target project uuid as the parent - if (Bytes.toString(family).equals(ACL_INFO_FAMILY) - && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { - String valueString = "{\"id\":\"" + projUUID - + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; + if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { + String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; value = Bytes.toBytes(valueString); } Put put = new Put(Bytes.toBytes(cubeId)); @@ -555,8 +531,7 @@ public class CubeMigrationCLI { String modelId = (String) opt.params[1]; Table destAclHtable = null; try { - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()) - .getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java index ec3764b..20d0f7d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java @@ -54,14 +54,11 @@ public class CubeMigrationCheckCLI { private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCheckCLI.class); - private static final Option OPTION_FIX = OptionBuilder.withArgName("fix").hasArg().isRequired(false) - .withDescription("Fix the inconsistent cube segments' HOST").create("fix"); + private static final Option OPTION_FIX = OptionBuilder.withArgName("fix").hasArg().isRequired(false).withDescription("Fix the inconsistent cube segments' HOST").create("fix"); - private static final Option OPTION_DST_CFG_URI = OptionBuilder.withArgName("dstCfgUri").hasArg().isRequired(false) - .withDescription("The KylinConfig of the cubeâs new home").create("dstCfgUri"); + private static final Option OPTION_DST_CFG_URI = OptionBuilder.withArgName("dstCfgUri").hasArg().isRequired(false).withDescription("The KylinConfig of the cubeâs new home").create("dstCfgUri"); - private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false) - .withDescription("The name of cube migrated").create("cube"); + private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The name of cube migrated").create("cube"); private KylinConfig dstCfg; private Admin hbaseAdmin; @@ -72,8 +69,7 @@ public class CubeMigrationCheckCLI { private boolean ifFix = false; public static void main(String[] args) throws ParseException, IOException { - logger.warn( - "org.apache.kylin.storage.hbase.util.CubeMigrationCheckCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCheckCLI instead"); + logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCheckCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCheckCLI instead"); OptionsHelper optionsHelper = new OptionsHelper(); @@ -191,9 +187,7 @@ public class CubeMigrationCheckCLI { for (String segFullName : inconsistentHTables) { String[] sepNameList = segFullName.split(","); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0])); - logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] - + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " - + dstCfg.getMetadataUrlPrefix()); + logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " + dstCfg.getMetadataUrlPrefix()); hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0])); desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix()); hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc); @@ -213,8 +207,7 @@ public class CubeMigrationCheckCLI { logger.info("------ HTables exist issues in hbase : not existing, metadata broken ------"); for (String segFullName : issueExistHTables) { String[] sepNameList = segFullName.split(","); - logger.error(sepNameList[0] + " belonging to cube " + sepNameList[1] - + " has some issues and cannot be read successfully!!!"); + logger.error(sepNameList[0] + " belonging to cube " + sepNameList[1] + " has some issues and cannot be read successfully!!!"); } logger.info("----------------------------------------------------"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index f5fb304..c437e66 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -137,14 +137,10 @@ public class DeployCoprocessorCLI { private static void printUsageAndExit() { logger.info("Usage: "); - logger.info( - "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar all"); - logger.info( - "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -table tableName1 tableName2 ..."); - logger.info( - "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -cube cubeName1 cubeName2 ... "); - logger.info( - "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -project projectName1 projectName2 ..."); + logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar all"); + logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -table tableName1 tableName2 ..."); + logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -cube cubeName1 cubeName2 ... "); + logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -project projectName1 projectName2 ..."); System.exit(0); } @@ -161,7 +157,7 @@ public class DeployCoprocessorCLI { ProjectInstance projectInstance = projectManager.getProject(p); List<RealizationEntry> cubeList = projectInstance.getRealizationEntries(RealizationType.CUBE); - for (RealizationEntry cube : cubeList) { + for (RealizationEntry cube: cubeList) { CubeInstance cubeInstance = cubeManager.getCube(cube.getRealization()); for (CubeSegment segment : cubeInstance.getSegments()) { String tableName = segment.getStorageLocationIdentifier(); @@ -234,8 +230,7 @@ public class DeployCoprocessorCLI { desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null); } - public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) - throws IOException { + public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); @@ -284,15 +279,13 @@ public class DeployCoprocessorCLI { return true; } - private static List<String> resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, - List<String> tableNames) throws IOException { + private static List<String> resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { List<String> processedTables = Collections.synchronizedList(new ArrayList<String>()); ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); CountDownLatch countDownLatch = new CountDownLatch(tableNames.size()); for (final String tableName : tableNames) { - coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, - tableName, processedTables)); + coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables)); } try { @@ -312,8 +305,7 @@ public class DeployCoprocessorCLI { private final String tableName; private final List<String> processedTables; - public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, - String tableName, List<String> processedTables) { + public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables) { this.countDownLatch = countDownLatch; this.hbaseAdmin = hbaseAdmin; this.hdfsCoprocessorJar = hdfsCoprocessorJar; @@ -358,8 +350,7 @@ public class DeployCoprocessorCLI { return path; } - public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, - Set<String> oldJarPaths) throws IOException { + public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException { Path uploadPath = null; File localCoprocessorFile = new File(localCoprocessorJar); @@ -417,8 +408,7 @@ public class DeployCoprocessorCLI { } private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) { - return fileStatus.getLen() == localCoprocessorFile.length() - && fileStatus.getModificationTime() == localCoprocessorFile.lastModified(); + return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified(); } private static String getBaseFileName(String localCoprocessorJar) { @@ -476,8 +466,7 @@ public class DeployCoprocessorCLI { ArrayList<String> result = new ArrayList<String>(); for (CubeInstance cube : cubeMgr.listAllCubes()) { - if (cube.getStorageType() == IStorageAware.ID_HBASE - || cube.getStorageType() == IStorageAware.ID_SHARDED_HBASE) { + if (cube.getStorageType() == IStorageAware.ID_HBASE || cube.getStorageType() == IStorageAware.ID_SHARDED_HBASE) { for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) { String tableName = seg.getStorageLocationIdentifier(); if (StringUtils.isBlank(tableName) == false) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index 0c2b251..1cdb2f8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -86,8 +86,7 @@ public class ExtendCubeToHybridCLI { } public static void main(String[] args) throws Exception { - logger.warn( - "org.apache.kylin.storage.hbase.util.ExtendCubeToHybridCLI is deprecated, use org.apache.kylin.tool.ExtendCubeToHybridCLI instead"); + logger.warn("org.apache.kylin.storage.hbase.util.ExtendCubeToHybridCLI is deprecated, use org.apache.kylin.tool.ExtendCubeToHybridCLI instead"); if (args.length != 2 && args.length != 3) { System.out.println("Usage: ExtendCubeToHybridCLI project cube [partition_date]"); @@ -123,8 +122,7 @@ public class ExtendCubeToHybridCLI { } public void createFromCube(String projectName, String cubeName, String partitionDateStr) throws Exception { - logger.info("Create hybrid for cube[" + cubeName + "], project[" + projectName + "], partition_date[" - + partitionDateStr + "]."); + logger.info("Create hybrid for cube[" + cubeName + "], project[" + projectName + "], partition_date[" + partitionDateStr + "]."); CubeInstance cubeInstance = cubeManager.getCube(cubeName); if (!validateCubeInstance(cubeInstance)) { @@ -158,8 +156,7 @@ public class ExtendCubeToHybridCLI { CubeSegment currentSeg = null; while (segmentIterator.hasNext()) { currentSeg = segmentIterator.next(); - if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= partitionDate - || currentSeg.getDateRangeEnd() > partitionDate)) { + if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= partitionDate || currentSeg.getDateRangeEnd() > partitionDate)) { segmentIterator.remove(); logger.info("CubeSegment[" + currentSeg + "] was removed."); } @@ -202,11 +199,9 @@ public class ExtendCubeToHybridCLI { List<RealizationEntry> realizationEntries = Lists.newArrayListWithCapacity(2); realizationEntries.add(RealizationEntry.create(RealizationType.CUBE, cubeInstance.getName())); realizationEntries.add(RealizationEntry.create(RealizationType.CUBE, newCubeInstance.getName())); - HybridInstance hybridInstance = HybridInstance.create(kylinConfig, renameHybrid(cubeInstance.getName()), - realizationEntries); + HybridInstance hybridInstance = HybridInstance.create(kylinConfig, renameHybrid(cubeInstance.getName()), realizationEntries); store.putResource(hybridInstance.getResourcePath(), hybridInstance, HybridManager.HYBRID_SERIALIZER); - ProjectManager.getInstance(kylinConfig).moveRealizationToProject(RealizationType.HYBRID, - hybridInstance.getName(), projectName, owner); + ProjectManager.getInstance(kylinConfig).moveRealizationToProject(RealizationType.HYBRID, hybridInstance.getName(), projectName, owner); logger.info("HybridInstance was saved at: " + hybridInstance.getResourcePath()); // copy Acl from old cube to new cube @@ -243,8 +238,7 @@ public class ExtendCubeToHybridCLI { String projUUID = project.getUuid(); Table aclHtable = null; try { - aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()) - .getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl")); + aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl")); // cube acl Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId))); @@ -255,10 +249,8 @@ public class ExtendCubeToHybridCLI { byte[] value = CellUtil.cloneValue(cell); // use the target project uuid as the parent - if (Bytes.toString(family).equals(ACL_INFO_FAMILY) - && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { - String valueString = "{\"id\":\"" + projUUID - + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; + if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { + String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; value = Bytes.toBytes(valueString); } Put put = new Put(Bytes.toBytes(newCubeId)); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index 56ac814..a317110 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -83,8 +83,7 @@ public class GridTableHBaseBenchmark { Hits hits = new Hits(N_ROWS, hitRatio, indexRatio); for (int i = 0; i < ROUND; i++) { - System.out.println("==================================== ROUND " + (i + 1) - + " ========================================"); + System.out.println("==================================== ROUND " + (i + 1) + " ========================================"); testRowScanWithIndex(conn, hits.getHitsForRowScanWithIndex()); testRowScanNoIndexFullScan(conn, hits.getHitsForRowScanNoIndex()); testRowScanNoIndexSkipScan(conn, hits.getHitsForRowScanNoIndex()); @@ -387,8 +386,7 @@ public class GridTableHBaseBenchmark { public void markEnd() { endTime = System.currentTimeMillis(); System.out.println(); - System.out.println(name + " ends, " + (endTime - startTime) + " ms, " + rowsRead + " rows read, " - + bytesRead + " bytes read"); + System.out.println(name + " ends, " + (endTime - startTime) + " ms, " + rowsRead + " rows read, " + bytesRead + " bytes read"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java index 34c5fa5..940d64a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java @@ -44,12 +44,10 @@ import com.google.common.collect.Lists; public class HBaseClean extends AbstractApplication { @SuppressWarnings("static-access") - private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(true) - .withDescription("actually delete or not").create("delete"); + private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(true).withDescription("actually delete or not").create("delete"); @SuppressWarnings("static-access") - private static final Option OPTION_TAG = OptionBuilder.withArgName("tag").hasArg().isRequired(true) - .withDescription("the tag of HTable").create("tag"); + private static final Option OPTION_TAG = OptionBuilder.withArgName("tag").hasArg().isRequired(true).withDescription("the tag of HTable").create("tag"); protected static final Logger logger = LoggerFactory.getLogger(HBaseClean.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java index 693be33..8dd2164 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java @@ -197,8 +197,7 @@ public class HbaseStreamingInput { logger.error("value size invalid!!!!!"); } - hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), - cell.getValueLength() + cell.getValueOffset())); + hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset())); rowCount++; } scanner.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java index 11aaf9c..ea05ab2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java @@ -40,12 +40,9 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("static-access") public class HtableAlterMetadataCLI extends AbstractApplication { - private static final Option OPTION_METADATA_KEY = OptionBuilder.withArgName("key").hasArg().isRequired(true) - .withDescription("The metadata key").create("key"); - private static final Option OPTION_METADATA_VALUE = OptionBuilder.withArgName("value").hasArg().isRequired(true) - .withDescription("The metadata value").create("value"); - protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME) - .hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); + private static final Option OPTION_METADATA_KEY = OptionBuilder.withArgName("key").hasArg().isRequired(true).withDescription("The metadata key").create("key"); + private static final Option OPTION_METADATA_VALUE = OptionBuilder.withArgName("value").hasArg().isRequired(true).withDescription("The metadata value").create("value"); + protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME).hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); protected static final Logger logger = LoggerFactory.getLogger(HtableAlterMetadataCLI.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java index 0a524f8..df4e912 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java @@ -46,11 +46,9 @@ import org.slf4j.LoggerFactory; public class OrphanHBaseCleanJob extends AbstractApplication { @SuppressWarnings("static-access") - private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false) - .withDescription("Delete the unused storage").create("delete"); + private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete"); @SuppressWarnings("static-access") - private static final Option OPTION_WHITELIST = OptionBuilder.withArgName("whitelist").hasArg().isRequired(true) - .withDescription("metadata store whitelist, separated with comma").create("whitelist"); + private static final Option OPTION_WHITELIST = OptionBuilder.withArgName("whitelist").hasArg().isRequired(true).withDescription("metadata store whitelist, separated with comma").create("whitelist"); protected static final Logger logger = LoggerFactory.getLogger(OrphanHBaseCleanJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java index 1377dd9..bba6745 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java @@ -49,13 +49,11 @@ public class PingHBaseCLI { Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); if (User.isHBaseSecurityEnabled(hconf)) { try { - System.out.println("--------------Getting kerberos credential for user " - + UserGroupInformation.getCurrentUser().getUserName()); + System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - System.out.println("--------------Error while getting kerberos credential for user " - + UserGroupInformation.getCurrentUser().getUserName()); + System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java index bd965dc..f9b7daf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java @@ -31,7 +31,7 @@ public class PrintHBaseConfig { public static void main(String[] args) throws IOException { MyConfig config = new MyConfig(HBaseConfiguration.create()); - + if (args.length == 0) { for (Map.Entry<Object, Object> item : config.getProps().entrySet()) { System.out.println(item.getKey() + "=" + item.getValue()); @@ -43,18 +43,18 @@ public class PrintHBaseConfig { System.out.println(config.get(args[0])); System.exit(0); } - + for (String arg : args) { System.out.println(arg + "=" + config.get(arg)); } System.exit(0); } - + private static class MyConfig extends Configuration { MyConfig(Configuration other) { super(other); } - + protected synchronized Properties getProps() { return super.getProps(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java index 59bebc9..f619007 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java @@ -37,10 +37,8 @@ public class Results { return null; } else { for (Cell c : cells) { - if (Bytes.compareTo(cf, 0, cf.length, c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength()) == 0 - && // - Bytes.compareTo(cq, 0, cq.length, c.getQualifierArray(), c.getQualifierOffset(), - c.getQualifierLength()) == 0) { + if (Bytes.compareTo(cf, 0, cf.length, c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength()) == 0 && // + Bytes.compareTo(cq, 0, cq.length, c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()) == 0) { return ByteBuffer.wrap(c.getValueArray(), c.getValueOffset(), c.getValueLength()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java index 6d58c3c..db516bb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java @@ -42,8 +42,7 @@ public class RowCounterCLI { public static void main(String[] args) throws IOException { if (args == null || args.length != 3) { - System.out.println( - "Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]"); + System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]"); } System.out.println(args[0]); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index 7eef298..f6b65ab 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -67,18 +67,15 @@ import org.slf4j.LoggerFactory; public class StorageCleanupJob extends AbstractApplication { @SuppressWarnings("static-access") - protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false) - .withDescription("Delete the unused storage").create("delete"); - protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false) - .withDescription("Warning: will delete all kylin intermediate hive tables").create("force"); + protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete"); + protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete all kylin intermediate hive tables").create("force"); protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class); public static final int deleteTimeout = 10; // Unit minute protected boolean delete = false; protected boolean force = false; - protected static ExecutableManager executableManager = ExecutableManager - .getInstance(KylinConfig.getInstanceFromEnv()); + protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); private void cleanUnusedHBaseTables(Configuration conf) throws IOException { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -102,8 +99,7 @@ public class StorageCleanupJob extends AbstractApplication { String tablename = seg.getStorageLocationIdentifier(); if (allTablesNeedToBeDropped.contains(tablename)) { allTablesNeedToBeDropped.remove(tablename); - logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " - + cube.getName() + " with status " + cube.getStatus()); + logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus()); } } } @@ -117,8 +113,7 @@ public class StorageCleanupJob extends AbstractApplication { try { futureTask.get(deleteTimeout, TimeUnit.MINUTES); } catch (TimeoutException e) { - logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + deleteTimeout - + " minutes!"); + logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + deleteTimeout + " minutes!"); futureTask.cancel(true); } catch (Exception e) { e.printStackTrace(); @@ -213,8 +208,7 @@ public class StorageCleanupJob extends AbstractApplication { if (!state.isFinalState()) { String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobId); allHdfsPathsNeedToBeDeleted.remove(path); - logger.info("Skip " + path + " from deletion list, as the path belongs to job " + jobId - + " with status " + state); + logger.info("Skip " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state); } } @@ -225,8 +219,7 @@ public class StorageCleanupJob extends AbstractApplication { if (jobUuid != null && jobUuid.equals("") == false) { String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobUuid); allHdfsPathsNeedToBeDeleted.remove(path); - logger.info("Skip " + path + " from deletion list, as the path belongs to segment " + seg - + " of cube " + cube.getName()); + logger.info("Skip " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName()); } } } @@ -363,8 +356,7 @@ public class StorageCleanupJob extends AbstractApplication { } public static void main(String[] args) throws Exception { - logger.warn( - "org.apache.kylin.storage.hbase.util.StorageCleanupJob is deprecated, use org.apache.kylin.tool.StorageCleanupJob instead"); + logger.warn("org.apache.kylin.storage.hbase.util.StorageCleanupJob is deprecated, use org.apache.kylin.tool.StorageCleanupJob instead"); StorageCleanupJob cli = new StorageCleanupJob(); cli.execute(args); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java index f749247..f0c4c5b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java @@ -35,8 +35,7 @@ public class TarGZUtil { dest.mkdir(); TarArchiveInputStream tarIn = null; - tarIn = new TarArchiveInputStream( - new GzipCompressorInputStream(new BufferedInputStream(new FileInputStream(tarFile)))); + tarIn = new TarArchiveInputStream(new GzipCompressorInputStream(new BufferedInputStream(new FileInputStream(tarFile)))); TarArchiveEntry tarEntry = tarIn.getNextTarEntry(); // tarIn is a TarArchiveInputStream http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java index 6edb970..63ffda0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java @@ -182,8 +182,7 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { } if (lock(lockPath)) { - logger.debug(client + " waited " + (System.currentTimeMillis() - waitStart) + " ms for lock path " - + lockPath); + logger.debug(client + " waited " + (System.currentTimeMillis() - waitStart) + " ms for lock path " + lockPath); return true; } } @@ -224,11 +223,9 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { String owner = peekLock(lockPath); if (owner == null) - throw new IllegalStateException( - client + " cannot unlock path " + lockPath + " which is not locked currently"); + throw new IllegalStateException(client + " cannot unlock path " + lockPath + " which is not locked currently"); if (client.equals(owner) == false) - throw new IllegalStateException( - client + " cannot unlock path " + lockPath + " which is locked by " + owner); + throw new IllegalStateException(client + " cannot unlock path " + lockPath + " which is locked by " + owner); try { curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath); @@ -266,12 +263,10 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: - watcher.onLock(event.getData().getPath(), - new String(event.getData().getData(), Charset.forName("UTF-8"))); + watcher.onLock(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); break; case CHILD_REMOVED: - watcher.onUnlock(event.getData().getPath(), - new String(event.getData().getData(), Charset.forName("UTF-8"))); + watcher.onUnlock(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); break; default: break; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java index 3c2d497..991a750 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java @@ -29,9 +29,8 @@ import org.apache.kylin.job.lock.JobLock; */ public class ZookeeperJobLock implements DistributedLock, JobLock { - private ZookeeperDistributedLock lock = (ZookeeperDistributedLock) new ZookeeperDistributedLock.Factory() - .lockForCurrentProcess(); - + private ZookeeperDistributedLock lock = (ZookeeperDistributedLock) new ZookeeperDistributedLock.Factory().lockForCurrentProcess(); + @Override public String getClient() { return lock.getClient(); @@ -61,7 +60,7 @@ public class ZookeeperJobLock implements DistributedLock, JobLock { public boolean isLockedByMe(String lockPath) { return lock.isLockedByMe(lockPath); } - + @Override public void unlock(String lockPath) { lock.unlock(lockPath); @@ -71,7 +70,6 @@ public class ZookeeperJobLock implements DistributedLock, JobLock { public void purgeLocks(String lockPathRoot) { lock.purgeLocks(lockPathRoot); } - @Override public Closeable watchLocks(String lockPathRoot, Executor executor, Watcher watcher) { return lock.watchLocks(lockPathRoot, executor, watcher); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java index b884b44..b5ebe89 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java @@ -41,13 +41,12 @@ public class ZookeeperUtil { Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - return StringUtils - .join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { - @Nullable - @Override - public String apply(String input) { - return input + ":" + port; - } - }), ","); + return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + @Nullable + @Override + public String apply(String input) { + return input + ":" + port; + } + }), ","); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java index aa235ae..fe1ad4e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -53,7 +53,7 @@ public class HDFSResourceStore extends ResourceStore { public HDFSResourceStore(KylinConfig kylinConfig) throws Exception { super(kylinConfig); StorageURL metadataUrl = kylinConfig.getMetadataUrl(); - + if (!metadataUrl.getScheme().equals("hdfs")) throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + metadataUrl); @@ -101,8 +101,7 @@ public class HDFSResourceStore extends ResourceStore { } @Override - protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) - throws IOException { + protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException { NavigableSet<String> resources = listResources(folderPath); if (resources == null) return Collections.emptyList(); @@ -178,20 +177,17 @@ public class HDFSResourceStore extends ResourceStore { } @Override - protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) - throws IOException, IllegalStateException { + protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { Path p = getRealHDFSPath(resPath); if (!fs.exists(p)) { if (oldTS != 0) { - throw new IllegalStateException( - "For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS); + throw new IllegalStateException("For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS); } } else { long realLastModify = getResourceTimestamp(resPath); if (realLastModify != oldTS) { - throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS - + ", but found " + realLastModify); + throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify); } } putResourceImpl(resPath, new ByteArrayInputStream(content), newTS); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java index 7f8df78..96ec653 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java @@ -83,6 +83,7 @@ public class LockManager { } + public String getLockPath(String resourceName) { if (!resourceName.startsWith("/")) resourceName = "/" + resourceName; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java index 0b553d8..ee5a415 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java @@ -17,9 +17,10 @@ */ package org.apache.kylin.storage.hdfs; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; + import java.util.concurrent.TimeUnit; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; public class ResourceLock { @@ -34,13 +35,13 @@ public class ResourceLock { public void acquire(long time, TimeUnit unit) throws Exception { boolean success = lock.acquire(time, unit); - if (!success) { + if(!success){ throw new IllegalStateException("Fail to get Zookeeper lock"); } } - public void acquire() throws Exception { - lock.acquire(); + public void acquire() throws Exception{ + lock.acquire(); } protected void release() throws Exception {