Repository: kylin
Updated Branches:
  refs/heads/master aac4e7e49 -> 467226f40


refactor on sharding


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/49b37efd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/49b37efd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/49b37efd

Branch: refs/heads/master
Commit: 49b37efd1c741d0d5900c4d500912f604df41260
Parents: aac4e7e
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Jul 25 13:35:51 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Tue Jul 26 13:19:45 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeSegment.java | 38 ++++++++++++----
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java |  2 +-
 .../kylin/engine/mr/common/CuboidShardUtil.java |  6 +--
 .../engine/mr/steps/CubingExecutableUtil.java   | 40 +++++++++++++++++
 .../engine/mr/steps/SaveStatisticsStep.java     | 46 +-------------------
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  2 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  2 +-
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |  2 +-
 .../storage/hbase/steps/CreateHTableJob.java    |  1 +
 9 files changed, 79 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index dca4381..febfa86 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -41,17 +41,20 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
 public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
+    private static final Logger logger = 
LoggerFactory.getLogger(CubeSegment.class);
 
     @JsonBackReference
     private CubeInstance cubeInstance;
@@ -85,7 +88,7 @@ public class CubeSegment implements Comparable<CubeSegment>, 
IBuildable {
     private long createTimeUTC;
     @JsonProperty("cuboid_shard_nums")
     private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
-    @JsonProperty("total_shards")
+    @JsonProperty("total_shards") //it is only valid when all cuboids are 
squshed into some shards. like the HBASE_STORAGE case, otherwise it'll stay 0
     private int totalShards = 0;
     @JsonProperty("blackout_cuboids")
     private List<Long> blackoutCuboids = Lists.newArrayList();
@@ -489,8 +492,16 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable {
         this.cuboidShardNums = newCuboidShards;
     }
 
-    public int getTotalShards() {
-        return totalShards;
+    public int getTotalShards(long cuboidId) {
+        if (totalShards > 0) {
+            //shard squashed case
+            logger.info("total shards for {} is {}", cuboidId, totalShards);
+            return totalShards;
+        } else {
+            int ret = getCuboidShardNum(cuboidId);
+            logger.info("total shards for {} is {}", cuboidId, ret);
+            return ret;
+        }
     }
 
     public void setTotalShards(int totalShards) {
@@ -498,12 +509,21 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable {
     }
 
     public short getCuboidBaseShard(Long cuboidId) {
-        Short ret = cuboidBaseShards.get(cuboidId);
-        if (ret == null) {
-            ret = ShardingHash.getShard(cuboidId, totalShards);
-            cuboidBaseShards.put(cuboidId, ret);
+        if (totalShards > 0) {
+            //shard squashed case
+
+            Short ret = cuboidBaseShards.get(cuboidId);
+            if (ret == null) {
+                ret = ShardingHash.getShard(cuboidId, totalShards);
+                cuboidBaseShards.put(cuboidId, ret);
+            }
+
+            logger.info("base for cuboid {} is {}", cuboidId, ret);
+            return ret;
+        } else {
+            logger.info("base for cuboid {} is {}", cuboidId, 0);
+            return 0;
         }
-        return ret;
     }
 
     public List<Long> getBlackoutCuboids() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index ebcbadd..ff37752 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -76,7 +76,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
             int shardSeedLength = UHCLength == -1 ? bodyLength : UHCLength;
             short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
             short shardOffset = ShardingHash.getShard(key, 
RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN + shardSeedOffset, shardSeedLength, 
cuboidShardNum);
-            return 
ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, 
cubeSeg.getTotalShards());
+            return 
ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, 
cubeSeg.getTotalShards(cuboid.getId()));
         } else {
             throw new RuntimeException("If enableSharding false, you should 
never calculate shard");
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
index 7b65ec6..70aa628 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
@@ -35,17 +35,17 @@ public class CuboidShardUtil {
     public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> 
cuboidShards, int totalShards) throws IOException {
         CubeManager cubeManager = CubeManager.getInstance(segment.getConfig());
 
-        Map<Long, Short> filered = Maps.newHashMap();
+        Map<Long, Short> filtered = Maps.newHashMap();
         for (Map.Entry<Long, Short> entry : cuboidShards.entrySet()) {
             if (entry.getValue() <= 1) {
                 logger.info("Cuboid {} has {} shards, skip saving it as an 
optimization", entry.getKey(), entry.getValue());
             } else {
                 logger.info("Cuboid {} has {} shards, saving it", 
entry.getKey(), entry.getValue());
-                filered.put(entry.getKey(), entry.getValue());
+                filtered.put(entry.getKey(), entry.getValue());
             }
         }
 
-        segment.setCuboidShardNums(filered);
+        segment.setCuboidShardNums(filtered);
         segment.setTotalShards(totalShards);
 
         CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance());

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
index 9e46ded..b0d5a89 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@ -23,9 +23,17 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import org.apache.commons.lang.StringUtils;
 
 import com.google.common.collect.Lists;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.ExecutableContext;
+
+import javax.annotation.Nullable;
 
 public class CubingExecutableUtil {
 
@@ -65,6 +73,38 @@ public class CubingExecutableUtil {
         params.put(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
     }
 
+    public static CubeSegment findSegment(ExecutableContext context, String 
cubeName, String segmentId) {
+        final CubeManager mgr = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = mgr.getCube(cubeName);
+
+        if (cube == null) {
+            String cubeList = 
StringUtils.join(Iterables.transform(mgr.listAllCubes(), new 
Function<CubeInstance, String>() {
+                @Nullable
+                @Override
+                public String apply(@Nullable CubeInstance input) {
+                    return input.getName();
+                }
+            }).iterator(), ",");
+
+            throw new IllegalStateException("target cube name: " + cubeName + 
" cube list: " + cubeList);
+        }
+
+        final CubeSegment newSegment = cube.getSegmentById(segmentId);
+
+        if (newSegment == null) {
+            String segmentList = 
StringUtils.join(Iterables.transform(cube.getSegments(), new 
Function<CubeSegment, String>() {
+                @Nullable
+                @Override
+                public String apply(@Nullable CubeSegment input) {
+                    return input.getUuid();
+                }
+            }).iterator(), ",");
+
+            throw new IllegalStateException("target segment id: " + segmentId 
+ " segment list: " + segmentList);
+        }
+        return newSegment;
+    }
+
     public static List<String> getMergingSegmentIds(Map<String, String> 
params) {
         final String ids = params.get(MERGING_SEGMENT_IDS);
         if (ids != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index d1772fd..3cace64 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -21,21 +21,16 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.util.Random;
 
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum;
 import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -46,9 +41,6 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
 /**
  * Save the cube segment statistic to Kylin metadata store
  */
@@ -60,43 +52,9 @@ public class SaveStatisticsStep extends AbstractExecutable {
         super();
     }
 
-    private CubeSegment findSegment(ExecutableContext context, String 
cubeName, String segmentId) {
-        final CubeManager mgr = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = mgr.getCube(cubeName);
-
-        if (cube == null) {
-            String cubeList = 
StringUtils.join(Iterables.transform(mgr.listAllCubes(), new 
Function<CubeInstance, String>() {
-                @Nullable
-                @Override
-                public String apply(@Nullable CubeInstance input) {
-                    return input.getName();
-                }
-            }).iterator(), ",");
-
-            logger.info("target cube name: {}, cube list: {}", cubeName, 
cubeList);
-            throw new IllegalStateException();
-        }
-
-        final CubeSegment newSegment = cube.getSegmentById(segmentId);
-
-        if (newSegment == null) {
-            String segmentList = 
StringUtils.join(Iterables.transform(cube.getSegments(), new 
Function<CubeSegment, String>() {
-                @Nullable
-                @Override
-                public String apply(@Nullable CubeSegment input) {
-                    return input.getUuid();
-                }
-            }).iterator(), ",");
-
-            logger.info("target segment id: {}, segment list: {}", segmentId, 
segmentList);
-            throw new IllegalStateException();
-        }
-        return newSegment;
-    }
-
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
-        CubeSegment newSegment = findSegment(context, 
CubingExecutableUtil.getCubeName(this.getParams()), 
CubingExecutableUtil.getSegmentId(this.getParams()));
+        CubeSegment newSegment = CubingExecutableUtil.findSegment(context, 
CubingExecutableUtil.getCubeName(this.getParams()), 
CubingExecutableUtil.getSegmentId(this.getParams()));
         KylinConfig kylinConf = newSegment.getConfig();
 
         ResourceStore rs = ResourceStore.getStore(kylinConf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 672bcbe..46f16fe 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -697,7 +697,7 @@ public class CubeStorageQuery implements IStorageQuery {
             short cuboidShardNum = 
segment.getCuboidShardNum(scan.getCuboid().getId());
             short cuboidShardBase = 
segment.getCuboidBaseShard(scan.getCuboid().getId());
             for (short i = 0; i < cuboidShardNum; ++i) {
-                short newShard = ShardingHash.normalize(cuboidShardBase, i, 
segment.getTotalShards());
+                short newShard = ShardingHash.normalize(cuboidShardBase, i, 
segment.getTotalShards(scan.getCuboid().getId()));
                 byte[] newStartKey = duplicateKeyAndChangeShard(newShard, 
startKey);
                 byte[] newStopKey = duplicateKeyAndChangeShard(newShard, 
stopKey);
                 HBaseKeyRange newRange = new HBaseKeyRange(segment, 
scan.getCuboid(), newStartKey, newStopKey, //

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 78ad18d..163226b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -270,7 +270,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
         short shardNum = shardNumAndBaseShard.getFirst();
         short cuboidBaseShard = shardNumAndBaseShard.getSecond();
-        int totalShards = cubeSeg.getTotalShards();
+        int totalShards = cubeSeg.getTotalShards(cuboid.getId());
 
         ByteString scanRequestByteString = null;
         ByteString rawScanByteString = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 938145b..dcedf76 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -137,7 +137,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         } else {
             List<byte[]> ret = Lists.newArrayList();
             for (short i = 0; i < cuboidShardNum; ++i) {
-                short shard = 
ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, 
cubeSeg.getTotalShards());
+                short shard = 
ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, 
cubeSeg.getTotalShards(cuboid.getId()));
                 byte[] cookedKey = Arrays.copyOf(halfCookedKey, 
halfCookedKey.length);
                 BytesUtil.writeShort(shard, cookedKey, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
                 ret.add(cookedKey);

http://git-wip-us.apache.org/repos/asf/kylin/blob/49b37efd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index d1c31de..b93e0a1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -210,6 +210,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
             for (int i = 0; i < nRegion; i++) {
                 innerRegionSplits.add(new HashMap<Long, Double>());
             }
+            
             double[] regionSizes = new double[nRegion];
             for (long cuboidId : allCuboids) {
                 double estimatedSize = cubeSizeMap.get(cuboidId);

Reply via email to