Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-942 eaef279c7 -> 7c4847678
minor change Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7c484767 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7c484767 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7c484767 Branch: refs/heads/KYLIN-942 Commit: 7c4847678ec04cd4912e96558951e99d6792e53c Parents: eaef279 Author: honma <ho...@ebay.com> Authored: Fri Sep 25 10:18:24 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Fri Sep 25 16:53:53 2015 +0800 ---------------------------------------------------------------------- .../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 3 +- .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 2 +- .../kylin/engine/mr/common/CuboidShardUtil.java | 8 +-- .../mr/steps/MapContextGTRecordWriter.java | 2 +- server/src/main/resources/log4j.properties | 2 +- .../storage/hbase/steps/CreateHTableJob.java | 52 ++++++++++++-------- .../kylin/storage/hbase/steps/MergeGCStep.java | 10 +++- 7 files changed, 50 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java index 1c2f4ee..254482c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java @@ -37,8 +37,9 @@ public class FuzzyMaskEncoder extends RowKeyEncoder { @Override protected int fillHeader(byte[] bytes) { + Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.FUZZY_MASK_ONE); // always fuzzy match cuboid ID to lock on the selected cuboid - Arrays.fill(bytes, 0, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO); + Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO); return this.headerLength; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java index 736a2b9..bc4a927 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java @@ -112,7 +112,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { if (this.headerLength != offset) { throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset); } - + return offset; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java index 4839ab0..507f5c4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java @@ -34,11 +34,11 @@ import com.google.common.collect.Maps; public class CuboidShardUtil { protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class); -// public static Map<Long, Short> loadCuboidShards(CubeSegment segment) { -// return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1); -// } + // public static Map<Long, Short> loadCuboidShards(CubeSegment segment) { + // return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1); + // } - public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards,int totalShards) throws IOException { + public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) throws IOException { CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); Map<Long, Short> filered = Maps.filterEntries(cuboidShards, new Predicate<Map.Entry<Long, Short>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java index 402bec0..7510c40 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java @@ -71,7 +71,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter { //fill shard short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId); short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum); - Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId); + short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId); short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/server/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/server/src/main/resources/log4j.properties b/server/src/main/resources/log4j.properties index ef4bff4..b04538a 100644 --- a/server/src/main/resources/log4j.properties +++ b/server/src/main/resources/log4j.properties @@ -30,7 +30,7 @@ log4j.logger.org.springframework=WARN log4j.logger.org.apache.kylin.rest.controller.QueryController=DEBUG, query log4j.logger.org.apache.kylin.rest.service.QueryService=DEBUG, query log4j.logger.org.apache.kylin.query=DEBUG, query -log4j.logger.org.apache.kylin.storage=DEBUG, query +#log4j.logger.org.apache.kylin.storage=DEBUG, query //too many stuff in storage package now #job config log4j.logger.org.apache.kylin.rest.controller.JobController=DEBUG, job http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 45a5f96..2444c6a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -250,19 +250,19 @@ public class CreateHTableJob extends AbstractHadoopJob { logger.info("Cube capacity " + cubeCapacity.toString() + ", chosen cut for HTable is " + cut + "GB"); - long totalSizeInM = 0; + double totalSizeInM = 0; List<Long> allCuboids = Lists.newArrayList(); allCuboids.addAll(cubeRowCountMap.keySet()); Collections.sort(allCuboids); - Map<Long, Long> cubeSizeMap = Maps.transformEntries(cubeRowCountMap, new Maps.EntryTransformer<Long, Long, Long>() { + Map<Long, Double> cubeSizeMap = Maps.transformEntries(cubeRowCountMap, new Maps.EntryTransformer<Long, Long, Double>() { @Override - public Long transformEntry(@Nullable Long key, @Nullable Long value) { + public Double transformEntry(@Nullable Long key, @Nullable Long value) { return estimateCuboidStorageSize(cubeDesc, key, value, baseCuboidId, rowkeyColumnSize); } }); - for (Long cuboidSize : cubeSizeMap.values()) { + for (Double cuboidSize : cubeSizeMap.values()) { totalSizeInM += cuboidSize; } @@ -270,14 +270,22 @@ public class CreateHTableJob extends AbstractHadoopJob { nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion); nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion); - //TODO: remove nRegion > 1 so that even small cubes will have at least two regions - if (ENABLE_CUBOID_SHARDING && (nRegion > 1)) { + if (ENABLE_CUBOID_SHARDING) {//&& (nRegion > 1)) { //use prime nRegions to help random sharding + int original = nRegion; nRegion = Primes.nextPrime(nRegion);//return 2 for input 1 - logger.info("Region count is adjusted to " + nRegion + " to help random sharding"); + + if (nRegion > Short.MAX_VALUE) { + logger.info("Too many regions! reduce to " + Short.MAX_VALUE); + nRegion = Short.MAX_VALUE; + } + + if (nRegion != original) { + logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding"); + } } - int mbPerRegion = (int) (totalSizeInM / (nRegion)); + int mbPerRegion = (int) (totalSizeInM / nRegion); mbPerRegion = Math.max(1, mbPerRegion); logger.info("Total size " + totalSizeInM + "M (estimated)"); @@ -287,17 +295,20 @@ public class CreateHTableJob extends AbstractHadoopJob { if (ENABLE_CUBOID_SHARDING) { //each cuboid will be split into different number of shards HashMap<Long, Short> cuboidShards = Maps.newHashMap(); - long[] regionSizes = new long[nRegion]; + double[] regionSizes = new double[nRegion]; for (long cuboidId : allCuboids) { - long estimatedSize = cubeSizeMap.get(cuboidId); - double magic = Math.PI; - int shard = (int) (1.0 * estimatedSize / mbPerRegion * magic); - if (shard == 0) { + double estimatedSize = cubeSizeMap.get(cuboidId); + double magic = 10; + int shard = (int) (1.0 * estimatedSize * magic / mbPerRegion); + if (shard < 1) { shard = 1; } - if (shard > Short.MAX_VALUE) { - logger.info(String.format("Cuboid %d 's estimated size %d MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shard, Short.MAX_VALUE)); - shard = Short.MAX_VALUE; + + if (shard > nRegion) { + logger.info(String.format("Cuboid %d 's estimated size %0.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shard, nRegion)); + shard = nRegion; + } else { + logger.info(String.format("Cuboid %d 's estimated size %0.2f MB will generate %d regions", cuboidId, estimatedSize, shard)); } cuboidShards.put(cuboidId, (short) shard); @@ -309,7 +320,7 @@ public class CreateHTableJob extends AbstractHadoopJob { } for (int i = 0; i < nRegion; ++i) { - logger.info(String.format("Region %d's estimated size is %d MB, accounting for %0.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM)); + logger.info(String.format("Region %d's estimated size is %0.2f MB, accounting for %0.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM)); } CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion); @@ -353,7 +364,7 @@ public class CreateHTableJob extends AbstractHadoopJob { * @param rowCount * @return the cuboid size in M bytes */ - private static long estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) { + private static double estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) { int bytesLength = RowConstants.ROWKEY_HEADER_LEN; @@ -380,8 +391,9 @@ public class CreateHTableJob extends AbstractHadoopJob { bytesLength += space; logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes."); - logger.info("Cuboid " + cuboidId + " total size is " + (bytesLength * rowCount / (1024L * 1024L)) + "M."); - return bytesLength * rowCount / (1024L * 1024L); + double ret = 1.0 * (bytesLength * rowCount / (1024L * 1024L)); + logger.info("Cuboid " + cuboidId + " total size is " + ret + "M."); + return ret; } public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index df42560..a4a8a35 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -56,8 +56,16 @@ public class MergeGCStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + try { + logger.info("Sleep one minute before deleting the Htables"); + Thread.sleep(60000); + } catch (InterruptedException e) { + logger.warn("Thread interrupted"); + } + + logger.info("Start doing merge gc work"); + StringBuffer output = new StringBuffer(); - List<String> oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();