http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index ef53cb7..938145b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -19,30 +19,35 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
-import javax.annotation.Nullable;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.HBaseConnection;
+import 
org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
@@ -84,53 +89,77 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         }
     }
 
-    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo 
fullGTInfo) {
+    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, final GTInfo 
fullGTInfo) {
         super(cubeSeg, cuboid, fullGTInfo);
+        MassInTupleFilter.VALUE_PROVIDER_FACTORY = new 
MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() 
{
+            @Override
+            public DimensionEncoding getDimEnc(TblColRef col) {
+                return 
fullGTInfo.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
+            }
+        });
     }
 
     @Override
-    public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) 
throws IOException {
-        final List<IGTScanner> scanners = Lists.newArrayList();
-        for (GTScanRequest request : scanRequests) {
-            scanners.add(getGTScanner(request));
-        }
+    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws 
IOException {
+        final IGTScanner scanner = getGTScannerInternal(scanRequest);
 
         return new IGTScanner() {
             @Override
             public GTInfo getInfo() {
-                return scanners.get(0).getInfo();
+                return scanner.getInfo();
             }
 
             @Override
             public int getScannedRowCount() {
                 int sum = 0;
-                for (IGTScanner s : scanners) {
-                    sum += s.getScannedRowCount();
-                }
+                sum += scanner.getScannedRowCount();
                 return sum;
             }
 
             @Override
             public void close() throws IOException {
-                for (IGTScanner s : scanners) {
-                    s.close();
-                }
+                scanner.close();
             }
 
             @Override
             public Iterator<GTRecord> iterator() {
-                return 
Iterators.concat(Iterators.transform(scanners.iterator(), new 
Function<IGTScanner, Iterator<GTRecord>>() {
-                    @Nullable
-                    @Override
-                    public Iterator<GTRecord> apply(IGTScanner input) {
-                        return input.iterator();
-                    }
-                }));
+                return scanner.iterator();
             }
         };
     }
 
-    private IGTScanner getGTScanner(final GTScanRequest scanRequest) throws 
IOException {
+    //for non-sharding cases it will only return one byte[] with not shard at 
beginning
+    private List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
+        final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+
+        if (!cubeSeg.isEnableSharding()) {
+            return Lists.newArrayList(halfCookedKey);//not shard to append at 
head, so it is already well cooked
+        } else {
+            List<byte[]> ret = Lists.newArrayList();
+            for (short i = 0; i < cuboidShardNum; ++i) {
+                short shard = 
ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, 
cubeSeg.getTotalShards());
+                byte[] cookedKey = Arrays.copyOf(halfCookedKey, 
halfCookedKey.length);
+                BytesUtil.writeShort(shard, cookedKey, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
+                ret.add(cookedKey);
+            }
+            return ret;
+        }
+    }
+
+    private List<RawScan> spawnRawScansForAllShards(RawScan rawScan) {
+        List<RawScan> ret = Lists.newArrayList();
+        List<byte[]> startKeys = getRowKeysDifferentShards(rawScan.startKey);
+        List<byte[]> endKeys = getRowKeysDifferentShards(rawScan.endKey);
+        for (int i = 0; i < startKeys.size(); i++) {
+            RawScan temp = new RawScan(rawScan);
+            temp.startKey = startKeys.get(i);
+            temp.endKey = endKeys.get(i);
+            ret.add(temp);
+        }
+        return ret;
+    }
+
+    private IGTScanner getGTScannerInternal(final GTScanRequest scanRequest) 
throws IOException {
 
         // primary key (also the 0th column block) is always selected
         final ImmutableBitSet selectedColBlocks = 
scanRequest.getSelectedColBlocks().set(0);
@@ -138,22 +167,23 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         HConnection hbaseConn = 
HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
         final HTableInterface hbaseTable = 
hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
 
-        List<RawScan> rawScans = preparedHBaseScans(scanRequest.getPkStart(), 
scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+        List<RawScan> rawScans = 
preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
         List<List<Integer>> hbaseColumnsToGT = 
getHBaseColumnsGTMapping(selectedColBlocks);
 
         final List<ResultScanner> scanners = Lists.newArrayList();
         final List<Iterator<Result>> resultIterators = Lists.newArrayList();
 
         for (RawScan rawScan : rawScans) {
+            for (RawScan rawScanWithShard : 
spawnRawScansForAllShards(rawScan)) {
+                logScan(rawScanWithShard, 
cubeSeg.getStorageLocationIdentifier());
+                Scan hbaseScan = buildScan(rawScanWithShard);
 
-            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
-            Scan hbaseScan = buildScan(rawScan);
-
-            final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
-            final Iterator<Result> iterator = scanner.iterator();
+                final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+                final Iterator<Result> iterator = scanner.iterator();
 
-            scanners.add(scanner);
-            resultIterators.add(iterator);
+                scanners.add(scanner);
+                resultIterators.add(iterator);
+            }
         }
 
         final Iterator<Result> allResultsIterator = 
Iterators.concat(resultIterators.iterator());

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 6bbb9c0..9ed914a 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -19,204 +19,48 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.BuildInFunctionTransformer;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.gridtable.EmptyGTScanner;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRangePlanner;
 import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.gridtable.ScannerWorker;
 import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import 
org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class CubeSegmentScanner implements IGTScanner {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CubeSegmentScanner.class);
 
-    private static final int MAX_SCAN_RANGES = 200;
-
     final CubeSegment cubeSeg;
-    final GTInfo info;
-    final List<GTScanRequest> scanRequests;
-    final Scanner scanner;
+    final ScannerWorker scanner;
     final Cuboid cuboid;
 
+    final GTScanRequest scanRequest;
+
     public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, 
Set<TblColRef> dimensions, Set<TblColRef> groups, //
             Collection<FunctionDesc> metrics, TupleFilter filter, boolean 
allowPreAggregate) {
         this.cuboid = cuboid;
         this.cubeSeg = cubeSeg;
-        this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
-
-        CuboidToGridTableMapping mapping = 
cuboid.getCuboidToGridTableMapping();
 
         // translate FunctionTupleFilter to IN clause
         ITupleFilterTransformer translator = new 
BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
         filter = translator.transform(filter);
 
-        //replace the constant values in filter to dictionary codes 
-        TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, 
info, mapping.getCuboidDimensionsInGTOrder(), groups);
-
-        ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, 
dimensions);
-        ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, 
replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
-        ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
-        String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
-
-        GTScanRangePlanner scanRangePlanner;
-        if 
(cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
-            TblColRef tblColRef = 
cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-            TblColRef partitionColOfGT = null;
-            Pair<ByteArray, ByteArray> segmentStartAndEnd = null;
-            int index = mapping.getIndexOf(tblColRef);
-            if (index >= 0) {
-                segmentStartAndEnd = getSegmentStartAndEnd(index);
-                partitionColOfGT = info.colRef(index);
-            }
-            scanRangePlanner = new GTScanRangePlanner(info, 
segmentStartAndEnd, partitionColOfGT);
-        } else {
-            scanRangePlanner = new GTScanRangePlanner(info, null, null);
-        }
-        List<GTScanRange> scanRanges = 
scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
-
-        scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
-
-        KylinConfig config = cubeSeg.getCubeInstance().getConfig();
-        for (GTScanRange range : scanRanges) {
-            GTScanRequest req = new GTScanRequest(info, range, gtDimensions, 
gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, 
config.getQueryCoprocessorMemGB());
-            scanRequests.add(req);
-        }
-
-        scanner = new Scanner();
-    }
-
-    private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) {
-        ByteArray start;
-        if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) {
-            start = encodeTime(cubeSeg.getDateRangeStart(), index, 1);
-        } else {
-            start = new ByteArray();
-        }
-
-        ByteArray end;
-        if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) {
-            end = encodeTime(cubeSeg.getDateRangeEnd(), index, -1);
-        } else {
-            end = new ByteArray();
-        }
-        return Pair.newPair(start, end);
-
-    }
-
-    private ByteArray encodeTime(long ts, int index, int roundingFlag) {
-        String value;
-        DataType partitionColType = info.getColumnType(index);
-        if (partitionColType.isDate()) {
-            value = DateFormat.formatToDateStr(ts);
-        } else if (partitionColType.isDatetime() || 
partitionColType.isTimestamp()) {
-            value = DateFormat.formatToTimeWithoutMilliStr(ts);
-        } else if (partitionColType.isStringFamily()) {
-            String partitionDateFormat = 
cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
-            if (StringUtils.isEmpty(partitionDateFormat))
-                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
-            value = DateFormat.formatToDateStr(ts, partitionDateFormat);
-        } else {
-            throw new RuntimeException("Type " + partitionColType + " is not 
valid partition column type");
-        }
-
-        ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength());
-        info.getCodeSystem().encodeColumnValue(index, value, roundingFlag, 
buffer);
-
-        return ByteArray.copyOf(buffer.array(), 0, buffer.position());
-    }
-
-    private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, 
CubeDesc cubeDesc) {
-        Set<TblColRef> ret = Sets.newHashSet();
-        for (TblColRef col : input) {
-            if (cubeDesc.hasHostColumn(col)) {
-                for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
-                    ret.add(host);
-                }
-            } else {
-                ret.add(col);
-            }
-        }
-        return ret;
-    }
-
-    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping 
mapping, Set<TblColRef> dimensions) {
-        BitSet result = new BitSet();
-        for (TblColRef dim : dimensions) {
-            int idx = mapping.getIndexOf(dim);
-            if (idx >= 0)
-                result.set(idx);
-        }
-        return new ImmutableBitSet(result);
-    }
-
-    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping 
mapping, Collection<FunctionDesc> metrics) {
-        BitSet result = new BitSet();
-        for (FunctionDesc metric : metrics) {
-            int idx = mapping.getIndexOf(metric);
-            if (idx < 0)
-                throw new IllegalStateException(metric + " not found in " + 
mapping);
-            result.set(idx);
-        }
-        return new ImmutableBitSet(result);
-    }
-
-    private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, 
Collection<FunctionDesc> metrics) {
-
-        //metrics are represented in ImmutableBitSet, which loses order 
information
-        //sort the aggrFuns to align with metrics natural order 
-        List<FunctionDesc> metricList = Lists.newArrayList(metrics);
-        Collections.sort(metricList, new Comparator<FunctionDesc>() {
-            @Override
-            public int compare(FunctionDesc o1, FunctionDesc o2) {
-                int a = mapping.getIndexOf(o1);
-                int b = mapping.getIndexOf(o2);
-                return a - b;
-            }
-        });
-
-        String[] result = new String[metricList.size()];
-        int i = 0;
-        for (FunctionDesc metric : metricList) {
-            result[i++] = metric.getExpression();
-        }
-        return result;
+        GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(cubeSeg, 
cuboid, filter, dimensions, groups, metrics);
+        scanRequest = scanRangePlanner.planScanRequest(allowPreAggregate);
+        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest);
     }
 
     @Override
@@ -231,7 +75,7 @@ public class CubeSegmentScanner implements IGTScanner {
 
     @Override
     public GTInfo getInfo() {
-        return info;
+        return scanRequest == null ? null : scanRequest.getInfo();
     }
 
     @Override
@@ -239,47 +83,4 @@ public class CubeSegmentScanner implements IGTScanner {
         return scanner.getScannedRowCount();
     }
 
-    private class Scanner {
-        IGTScanner internal = null;
-
-        public Scanner() {
-            CubeHBaseRPC rpc;
-            if 
("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
-                MassInTupleFilter.VALUE_PROVIDER_FACTORY = new 
MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() 
{
-                    @Override
-                    public DimensionEncoding getDimEnc(TblColRef col) {
-                        return 
info.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
-                    }
-                });
-                rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); // for 
local debug
-            } else {
-                rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); // 
default behavior
-            }
-
-            try {
-                if (scanRequests.size() == 0) {
-                    logger.info("Segment {} will be skipped", cubeSeg);
-                    internal = new EmptyGTScanner(0);
-                } else {
-                    internal = rpc.getGTScanner(scanRequests);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("error", e);
-            }
-        }
-
-        public Iterator<GTRecord> iterator() {
-            return internal.iterator();
-        }
-
-        public void close() throws IOException {
-            internal.close();
-        }
-
-        public int getScannedRowCount() {
-            return internal.getScannedRowCount();
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 863dd67..e0e6d83 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -79,26 +79,26 @@ public class CubeStorageQuery implements IStorageQuery {
         Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
         buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
 
-        // all dimensions = groups + filter dimensions
-        Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
-        filterDims.removeAll(groups);
+        // all dimensions = groups + other(like filter) dimensions
+        Set<TblColRef> otherDims = Sets.newHashSet(dimensions);
+        otherDims.removeAll(groups);
 
         // expand derived (xxxD means contains host columns only, derived 
columns were translated)
         Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
         Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> filterDimsD = expandDerived(filterDims, 
derivedPostAggregation);
-        filterDimsD.removeAll(groupsD);
+        Set<TblColRef> otherDimsD = expandDerived(otherDims, 
derivedPostAggregation);
+        otherDimsD.removeAll(groupsD);
 
         // identify cuboid
         Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
         dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(filterDimsD);
-        Cuboid cuboid = identifyCuboid(dimensionsD, metrics);
+        dimensionsD.addAll(otherDimsD);
+        Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc,dimensionsD, metrics);
         context.setCuboid(cuboid);
 
         // isExactAggregation? meaning: tuples returned from storage requires 
no further aggregation in query engine
         Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groups, 
filterDimsD, singleValuesD, derivedPostAggregation);
+        boolean isExactAggregation = isExactAggregation(cuboid, groups, 
otherDimsD, singleValuesD, derivedPostAggregation);
         context.setExactAggregation(isExactAggregation);
 
         // replace derived columns in filter with host columns; columns on 
loosened condition must be added to group by
@@ -169,19 +169,7 @@ public class CubeStorageQuery implements IStorageQuery {
         return expanded;
     }
 
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions, 
Collection<FunctionDesc> metrics) {
-        for (FunctionDesc metric : metrics) {
-            if (metric.getMeasureType().onlyAggrInBaseCuboid())
-                return Cuboid.getBaseCuboid(cubeDesc);
-        }
-
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return Cuboid.findById(cubeDesc, cuboidID);
-    }
+   
 
     @SuppressWarnings("unchecked")
     private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
index c2ffdba..c2eacc1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
@@ -47,6 +47,16 @@ public class RawScan {
         this.hbaseMaxResultSize = hbaseMaxResultSize;
     }
 
+    public RawScan(RawScan other) {
+
+        this.startKey = other.startKey;
+        this.endKey = other.endKey;
+        this.hbaseColumns = other.hbaseColumns;
+        this.fuzzyKeys = other.fuzzyKeys;
+        this.hbaseCaching = other.hbaseCaching;
+        this.hbaseMaxResultSize = other.hbaseMaxResultSize;
+    }
+
     public String getStartKeyAsString() {
         return BytesUtil.toHex(this.startKey);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 1e7b1b5..9e8e251 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.io.IOUtils;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dimension.DimensionEncoding;
@@ -58,6 +60,7 @@ import 
org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryI
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.RpcCallback;
@@ -138,8 +141,21 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
         Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
     }
 
-    private void appendProfileInfo(StringBuilder sb) {
+    private List<RawScan> deserializeRawScans(ByteBuffer in) {
+        int rawScanCount = BytesUtil.readVInt(in);
+        List<RawScan> ret = Lists.newArrayList();
+        for (int i = 0; i < rawScanCount; i++) {
+            RawScan temp = RawScan.serializer.deserialize(in);
+            ret.add(temp);
+        }
+        return ret;
+    }
+
+    private void appendProfileInfo(StringBuilder sb, String info) {
         sb.append(System.currentTimeMillis() - this.serviceStartTime);
+        if (info != null) {
+            sb.append(":").append(info);
+        }
         sb.append(",");
     }
 
@@ -159,7 +175,12 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             region.startRegionOperation();
 
             final GTScanRequest scanReq = 
GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
-            final RawScan hbaseRawScan = 
RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+            List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
+            for (IntList intList : request.getHbaseColumnsToGTList()) {
+                hbaseColumnsToGT.add(intList.getIntsList());
+            }
+            CoprocessorBehavior behavior = 
CoprocessorBehavior.valueOf(request.getBehavior());
+            final List<RawScan> hbaseRawScans = 
deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
 
             MassInTupleFilter.VALUE_PROVIDER_FACTORY = new 
MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() 
{
                 @Override
@@ -168,40 +189,61 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 }
             });
 
-            List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
-            for (IntList intList : request.getHbaseColumnsToGTList()) {
-                hbaseColumnsToGT.add(intList.getIntsList());
-            }
-
-            if (request.getRowkeyPreambleSize() - 
RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
-                //if has shard, fill region shard to raw scan start/end
-                updateRawScanByCurrentRegion(hbaseRawScan, region, 
request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
-            }
-
-            Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
+            final List<InnerScannerAsIterator> cellListsForeachRawScan = 
Lists.newArrayList();
+            for (RawScan hbaseRawScan : hbaseRawScans) {
+                if (request.getRowkeyPreambleSize() - 
RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
+                    //if has shard, fill region shard to raw scan start/end
+                    updateRawScanByCurrentRegion(hbaseRawScan, region, 
request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
+                }
 
-            appendProfileInfo(sb);
+                Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
+                innerScanner = region.getScanner(scan);
 
-            innerScanner = region.getScanner(scan);
-            CoprocessorBehavior behavior = 
CoprocessorBehavior.valueOf(request.getBehavior());
+                InnerScannerAsIterator cellListIterator = new 
InnerScannerAsIterator(innerScanner);
+                cellListsForeachRawScan.add(cellListIterator);
+            }
+            
+            final Iterator<List<Cell>> allCellLists = 
Iterators.concat(cellListsForeachRawScan.iterator());
 
             if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
+                //this is only for CoprocessorBehavior.RAW_SCAN case to 
profile hbase scan speed
                 List<Cell> temp = Lists.newArrayList();
                 int counter = 0;
                 while (innerScanner.nextRaw(temp)) {
                     counter++;
                 }
-                sb.append("Scanned " + counter + " rows in " + 
(System.currentTimeMillis() - serviceStartTime) + ",");
+                appendProfileInfo(sb, "scanned " + counter);
             }
 
-            InnerScannerAsIterator cellListIterator = new 
InnerScannerAsIterator(innerScanner);
             if (behavior.ordinal() < 
CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
-            IGTScanner rawScanner = store.scan(scanReq);
+            IGTStore store = new HBaseReadonlyStore(new CellListIterator() {
+                @Override
+                public void close() throws IOException {
+                    for (CellListIterator closeable : cellListsForeachRawScan) 
{
+                        closeable.close();
+                    }
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return allCellLists.hasNext();
+                }
+
+                @Override
+                public List<Cell> next() {
+                    return allCellLists.next();
+                }
 
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            }, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
request.getRowkeyPreambleSize());
+
+            IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
                     behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER.ordinal(), //
                     behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
@@ -219,20 +261,20 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 finalRowCount++;
             }
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "agg done");
 
             //outputStream.close() is not necessary
             allRows = outputStream.toByteArray();
             byte[] compressedAllRows = CompressionUtils.compress(allRows);
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "compress done");
 
             OperatingSystemMXBean operatingSystemMXBean = 
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
             double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
             double freePhysicalMemorySize = 
operatingSystemMXBean.getFreePhysicalMemorySize();
             double freeSwapSpaceSize = 
operatingSystemMXBean.getFreeSwapSpaceSize();
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "server stats done");
 
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = 
CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
index b12451a..fa1687b 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
@@ -42,7 +42,7 @@ public class SandboxMetastoreCLI {
     public static void main(String[] args) throws Exception {
         logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF, 
HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }

Reply via email to