Repository: kylin Updated Branches: refs/heads/master aac4e7e49 -> 467226f40
refactor on sharding Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/49b37efd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/49b37efd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/49b37efd Branch: refs/heads/master Commit: 49b37efd1c741d0d5900c4d500912f604df41260 Parents: aac4e7e Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Jul 25 13:35:51 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Tue Jul 26 13:19:45 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeSegment.java | 38 ++++++++++++---- .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 2 +- .../kylin/engine/mr/common/CuboidShardUtil.java | 6 +-- .../engine/mr/steps/CubingExecutableUtil.java | 40 +++++++++++++++++ .../engine/mr/steps/SaveStatisticsStep.java | 46 +------------------- .../storage/hbase/cube/v1/CubeStorageQuery.java | 2 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +- .../storage/hbase/steps/CreateHTableJob.java | 1 + 9 files changed, 79 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index dca4381..febfa86 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -41,17 +41,20 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonBackReference; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class CubeSegment implements Comparable<CubeSegment>, IBuildable { + private static final Logger logger = LoggerFactory.getLogger(CubeSegment.class); @JsonBackReference private CubeInstance cubeInstance; @@ -85,7 +88,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable { private long createTimeUTC; @JsonProperty("cuboid_shard_nums") private Map<Long, Short> cuboidShardNums = Maps.newHashMap(); - @JsonProperty("total_shards") + @JsonProperty("total_shards") //it is only valid when all cuboids are squshed into some shards. like the HBASE_STORAGE case, otherwise it'll stay 0 private int totalShards = 0; @JsonProperty("blackout_cuboids") private List<Long> blackoutCuboids = Lists.newArrayList(); @@ -489,8 +492,16 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable { this.cuboidShardNums = newCuboidShards; } - public int getTotalShards() { - return totalShards; + public int getTotalShards(long cuboidId) { + if (totalShards > 0) { + //shard squashed case + logger.info("total shards for {} is {}", cuboidId, totalShards); + return totalShards; + } else { + int ret = getCuboidShardNum(cuboidId); + logger.info("total shards for {} is {}", cuboidId, ret); + return ret; + } } public void setTotalShards(int totalShards) { @@ -498,12 +509,21 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable { } public short getCuboidBaseShard(Long cuboidId) { - Short ret = cuboidBaseShards.get(cuboidId); - if (ret == null) { - ret = ShardingHash.getShard(cuboidId, totalShards); - cuboidBaseShards.put(cuboidId, ret); + if (totalShards > 0) { + //shard squashed case + + Short ret = cuboidBaseShards.get(cuboidId); + if (ret == null) { + ret = ShardingHash.getShard(cuboidId, totalShards); + cuboidBaseShards.put(cuboidId, ret); + } + + logger.info("base for cuboid {} is {}", cuboidId, ret); + return ret; + } else { + logger.info("base for cuboid {} is {}", cuboidId, 0); + return 0; } - return ret; } public List<Long> getBlackoutCuboids() { http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java index ebcbadd..ff37752 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java @@ -76,7 +76,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { int shardSeedLength = UHCLength == -1 ? bodyLength : UHCLength; short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); short shardOffset = ShardingHash.getShard(key, RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN + shardSeedOffset, shardSeedLength, cuboidShardNum); - return ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards()); + return ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards(cuboid.getId())); } else { throw new RuntimeException("If enableSharding false, you should never calculate shard"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java index 7b65ec6..70aa628 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java @@ -35,17 +35,17 @@ public class CuboidShardUtil { public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) throws IOException { CubeManager cubeManager = CubeManager.getInstance(segment.getConfig()); - Map<Long, Short> filered = Maps.newHashMap(); + Map<Long, Short> filtered = Maps.newHashMap(); for (Map.Entry<Long, Short> entry : cuboidShards.entrySet()) { if (entry.getValue() <= 1) { logger.info("Cuboid {} has {} shards, skip saving it as an optimization", entry.getKey(), entry.getValue()); } else { logger.info("Cuboid {} has {} shards, saving it", entry.getKey(), entry.getValue()); - filered.put(entry.getKey(), entry.getValue()); + filtered.put(entry.getKey(), entry.getValue()); } } - segment.setCuboidShardNums(filered); + segment.setCuboidShardNums(filtered); segment.setTotalShards(totalShards); CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance()); http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java index 9e46ded..b0d5a89 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java @@ -23,9 +23,17 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import com.google.common.collect.Lists; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.job.execution.ExecutableContext; + +import javax.annotation.Nullable; public class CubingExecutableUtil { @@ -65,6 +73,38 @@ public class CubingExecutableUtil { params.put(MERGING_SEGMENT_IDS, StringUtils.join(ids, ",")); } + public static CubeSegment findSegment(ExecutableContext context, String cubeName, String segmentId) { + final CubeManager mgr = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = mgr.getCube(cubeName); + + if (cube == null) { + String cubeList = StringUtils.join(Iterables.transform(mgr.listAllCubes(), new Function<CubeInstance, String>() { + @Nullable + @Override + public String apply(@Nullable CubeInstance input) { + return input.getName(); + } + }).iterator(), ","); + + throw new IllegalStateException("target cube name: " + cubeName + " cube list: " + cubeList); + } + + final CubeSegment newSegment = cube.getSegmentById(segmentId); + + if (newSegment == null) { + String segmentList = StringUtils.join(Iterables.transform(cube.getSegments(), new Function<CubeSegment, String>() { + @Nullable + @Override + public String apply(@Nullable CubeSegment input) { + return input.getUuid(); + } + }).iterator(), ","); + + throw new IllegalStateException("target segment id: " + segmentId + " segment list: " + segmentList); + } + return newSegment; + } + public static List<String> getMergingSegmentIds(Map<String, String> params) { final String ids = params.get(MERGING_SEGMENT_IDS); if (ids != null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index d1772fd..3cace64 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -21,21 +21,16 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.Random; -import javax.annotation.Nullable; - -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum; import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.job.exception.ExecuteException; @@ -46,9 +41,6 @@ import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; - /** * Save the cube segment statistic to Kylin metadata store */ @@ -60,43 +52,9 @@ public class SaveStatisticsStep extends AbstractExecutable { super(); } - private CubeSegment findSegment(ExecutableContext context, String cubeName, String segmentId) { - final CubeManager mgr = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = mgr.getCube(cubeName); - - if (cube == null) { - String cubeList = StringUtils.join(Iterables.transform(mgr.listAllCubes(), new Function<CubeInstance, String>() { - @Nullable - @Override - public String apply(@Nullable CubeInstance input) { - return input.getName(); - } - }).iterator(), ","); - - logger.info("target cube name: {}, cube list: {}", cubeName, cubeList); - throw new IllegalStateException(); - } - - final CubeSegment newSegment = cube.getSegmentById(segmentId); - - if (newSegment == null) { - String segmentList = StringUtils.join(Iterables.transform(cube.getSegments(), new Function<CubeSegment, String>() { - @Nullable - @Override - public String apply(@Nullable CubeSegment input) { - return input.getUuid(); - } - }).iterator(), ","); - - logger.info("target segment id: {}, segment list: {}", segmentId, segmentList); - throw new IllegalStateException(); - } - return newSegment; - } - @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - CubeSegment newSegment = findSegment(context, CubingExecutableUtil.getCubeName(this.getParams()), CubingExecutableUtil.getSegmentId(this.getParams())); + CubeSegment newSegment = CubingExecutableUtil.findSegment(context, CubingExecutableUtil.getCubeName(this.getParams()), CubingExecutableUtil.getSegmentId(this.getParams())); KylinConfig kylinConf = newSegment.getConfig(); ResourceStore rs = ResourceStore.getStore(kylinConf); http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 672bcbe..46f16fe 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -697,7 +697,7 @@ public class CubeStorageQuery implements IStorageQuery { short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId()); short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId()); for (short i = 0; i < cuboidShardNum; ++i) { - short newShard = ShardingHash.normalize(cuboidShardBase, i, segment.getTotalShards()); + short newShard = ShardingHash.normalize(cuboidShardBase, i, segment.getTotalShards(scan.getCuboid().getId())); byte[] newStartKey = duplicateKeyAndChangeShard(newShard, startKey); byte[] newStopKey = duplicateKeyAndChangeShard(newShard, stopKey); HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, // http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 78ad18d..163226b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -270,7 +270,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard(); short shardNum = shardNumAndBaseShard.getFirst(); short cuboidBaseShard = shardNumAndBaseShard.getSecond(); - int totalShards = cubeSeg.getTotalShards(); + int totalShards = cubeSeg.getTotalShards(cuboid.getId()); ByteString scanRequestByteString = null; ByteString rawScanByteString = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index 938145b..dcedf76 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -137,7 +137,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } else { List<byte[]> ret = Lists.newArrayList(); for (short i = 0; i < cuboidShardNum; ++i) { - short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); + short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards(cuboid.getId())); byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length); BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); ret.add(cookedKey); http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index d1c31de..b93e0a1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -210,6 +210,7 @@ public class CreateHTableJob extends AbstractHadoopJob { for (int i = 0; i < nRegion; i++) { innerRegionSplits.add(new HashMap<Long, Double>()); } + double[] regionSizes = new double[nRegion]; for (long cuboidId : allCuboids) { double estimatedSize = cubeSizeMap.get(cuboidId);