KYLIN-1308 query storage v2 enable parallel cube visiting

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eb1d276c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eb1d276c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eb1d276c

Branch: refs/heads/KYLIN-1122
Commit: eb1d276c7721bd3056328e4256b2b54ffa95346d
Parents: 06f79e6
Author: honma <ho...@ebay.com>
Authored: Wed Jan 13 17:53:40 2016 +0800
Committer: Xiaoyu Wang <wangxia...@apache.org>
Committed: Mon Jan 18 13:38:30 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/cube/kv/LazyRowKeyEncoder.java |  26 --
 .../apache/kylin/gridtable/GTScanRequest.java   | 139 +++++----
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 223 ++++++++------
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |  31 +-
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 304 +++++++++----------
 .../hbase/cube/v2/CubeSegmentScanner.java       | 105 ++-----
 .../storage/hbase/cube/v2/CubeStorageQuery.java |   1 +
 .../coprocessor/endpoint/CubeVisitService.java  |  40 ++-
 8 files changed, 416 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
index 7c70fff..0fca726 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
@@ -18,16 +18,9 @@
 
 package org.apache.kylin.cube.kv;
 
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 
-import com.google.common.collect.Lists;
-
 /**
  * A LazyRowKeyEncoder will not try to calculate shard
  * It works for both enableSharding or non-enableSharding scenario
@@ -45,23 +38,4 @@ public class LazyRowKeyEncoder extends RowKeyEncoder {
             throw new RuntimeException("If enableSharding false, you should 
never calculate shard");
         }
     }
-
-    //for non-sharding cases it will only return one byte[] with not shard at 
beginning
-    public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
-        final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
-
-        if (!enableSharding) {
-            return Lists.newArrayList(halfCookedKey);//not shard to append at 
head, so it is already well cooked
-        } else {
-            List<byte[]> ret = Lists.newArrayList();
-            for (short i = 0; i < cuboidShardNum; ++i) {
-                short shard = 
ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, 
cubeSeg.getTotalShards());
-                byte[] cookedKey = Arrays.copyOf(halfCookedKey, 
halfCookedKey.length);
-                BytesUtil.writeShort(shard, cookedKey, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
-                ret.add(cookedKey);
-            }
-            return ret;
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index abaec85..8b0779d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -35,73 +35,6 @@ public class GTScanRequest {
     private boolean allowPreAggregation = true;
     private double aggrCacheGB = 0; // no limit
 
-    public static final BytesSerializer<GTScanRequest> serializer = new 
BytesSerializer<GTScanRequest>() {
-        @Override
-        public void serialize(GTScanRequest value, ByteBuffer out) {
-            GTInfo.serializer.serialize(value.info, out);
-
-            serializeGTRecord(value.range.pkStart, out);
-            serializeGTRecord(value.range.pkEnd, out);
-            BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out);
-            for (GTRecord f : value.range.fuzzyKeys) {
-                serializeGTRecord(f, out);
-            }
-
-            ImmutableBitSet.serializer.serialize(value.columns, out);
-            
BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, 
value.info), out);
-
-            ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
-            ImmutableBitSet.serializer.serialize(value.aggrMetrics, out);
-            BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out);
-            BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out);
-            out.putDouble(value.aggrCacheGB);
-        }
-
-        @Override
-        public GTScanRequest deserialize(ByteBuffer in) {
-            GTInfo sInfo = GTInfo.serializer.deserialize(in);
-
-            GTRecord sPkStart = deserializeGTRecord(in, sInfo);
-            GTRecord sPkEnd = deserializeGTRecord(in, sInfo);
-            List<GTRecord> sFuzzyKeys = Lists.newArrayList();
-            int sFuzzyKeySize = BytesUtil.readVInt(in);
-            for (int i = 0; i < sFuzzyKeySize; i++) {
-                sFuzzyKeys.add(deserializeGTRecord(in, sInfo));
-            }
-            GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys);
-
-            ImmutableBitSet sColumns = 
ImmutableBitSet.serializer.deserialize(in);
-            TupleFilter sGTFilter = 
GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo);
-
-            ImmutableBitSet sAggGroupBy = 
ImmutableBitSet.serializer.deserialize(in);
-            ImmutableBitSet sAggrMetrics = 
ImmutableBitSet.serializer.deserialize(in);
-            String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in);
-            boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
-            double sAggrCacheGB = in.getDouble();
-
-            return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, 
sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
-        }
-
-        private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
-            BytesUtil.writeVInt(gtRecord.cols.length, out);
-            for (ByteArray col : gtRecord.cols) {
-                col.exportData(out);
-            }
-            
ImmutableBitSet.serializer.serialize(gtRecord.maskForEqualHashComp, out);
-        }
-
-        private GTRecord deserializeGTRecord(ByteBuffer in, GTInfo sInfo) {
-            int colLength = BytesUtil.readVInt(in);
-            ByteArray[] sCols = new ByteArray[colLength];
-            for (int i = 0; i < colLength; i++) {
-                sCols[i] = ByteArray.importData(in);
-            }
-            ImmutableBitSet sMaskForEqualHashComp = 
ImmutableBitSet.serializer.deserialize(in);
-            return new GTRecord(sInfo, sMaskForEqualHashComp, sCols);
-        }
-
-    };
-
     public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet 
columns, TupleFilter filterPushDown) {
         this.info = info;
         this.range = range == null ? new GTScanRange(new GTRecord(info), new 
GTRecord(info)) : range;
@@ -128,9 +61,6 @@ public class GTScanRequest {
     }
 
     private void validate(GTInfo info) {
-        if (range == null)
-            range = new GTScanRange(null, null);
-
         if (hasAggregation()) {
             if (aggrGroupBy.intersects(aggrMetrics))
                 throw new IllegalStateException();
@@ -292,4 +222,73 @@ public class GTScanRequest {
         return "GTScanRequest [range=" + range + ", columns=" + columns + ", 
filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", 
aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + 
Arrays.toString(aggrMetricsFuncs) + "]";
     }
 
+
+    public static final BytesSerializer<GTScanRequest> serializer = new 
BytesSerializer<GTScanRequest>() {
+        @Override
+        public void serialize(GTScanRequest value, ByteBuffer out) {
+            GTInfo.serializer.serialize(value.info, out);
+
+            serializeGTRecord(value.range.pkStart, out);
+            serializeGTRecord(value.range.pkEnd, out);
+            BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out);
+            for (GTRecord f : value.range.fuzzyKeys) {
+                serializeGTRecord(f, out);
+            }
+
+            ImmutableBitSet.serializer.serialize(value.columns, out);
+            
BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, 
value.info), out);
+
+            ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
+            ImmutableBitSet.serializer.serialize(value.aggrMetrics, out);
+            BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out);
+            BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out);
+            out.putDouble(value.aggrCacheGB);
+        }
+
+        @Override
+        public GTScanRequest deserialize(ByteBuffer in) {
+            GTInfo sInfo = GTInfo.serializer.deserialize(in);
+
+            GTRecord sPkStart = deserializeGTRecord(in, sInfo);
+            GTRecord sPkEnd = deserializeGTRecord(in, sInfo);
+            List<GTRecord> sFuzzyKeys = Lists.newArrayList();
+            int sFuzzyKeySize = BytesUtil.readVInt(in);
+            for (int i = 0; i < sFuzzyKeySize; i++) {
+                sFuzzyKeys.add(deserializeGTRecord(in, sInfo));
+            }
+            GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys);
+
+            ImmutableBitSet sColumns = 
ImmutableBitSet.serializer.deserialize(in);
+            TupleFilter sGTFilter = 
GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo);
+
+            ImmutableBitSet sAggGroupBy = 
ImmutableBitSet.serializer.deserialize(in);
+            ImmutableBitSet sAggrMetrics = 
ImmutableBitSet.serializer.deserialize(in);
+            String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in);
+            boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
+            double sAggrCacheGB = in.getDouble();
+
+            return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, 
sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
+        }
+
+        private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
+            BytesUtil.writeVInt(gtRecord.cols.length, out);
+            for (ByteArray col : gtRecord.cols) {
+                col.exportData(out);
+            }
+            
ImmutableBitSet.serializer.serialize(gtRecord.maskForEqualHashComp, out);
+        }
+
+        private GTRecord deserializeGTRecord(ByteBuffer in, GTInfo sInfo) {
+            int colLength = BytesUtil.readVInt(in);
+            ByteArray[] sCols = new ByteArray[colLength];
+            for (int i = 0; i < colLength; i++) {
+                sCols[i] = ByteArray.importData(in);
+            }
+            ImmutableBitSet sMaskForEqualHashComp = 
ImmutableBitSet.serializer.deserialize(in);
+            return new GTRecord(sInfo, sMaskForEqualHashComp, sCols);
+        }
+
+    };
+
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index a193d10..05c9ffe 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -20,31 +20,30 @@ package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
 
 import javax.annotation.Nullable;
 
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
@@ -56,15 +55,14 @@ import 
org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
 import 
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
 import 
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
 import 
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.HBaseZeroCopyByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
@@ -72,6 +70,49 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static ExecutorService executorService = 
Executors.newCachedThreadPool();
 
+    static class ExpectedSizeIterator implements Iterator<byte[]> {
+
+        int expectedSize;
+        int current = 0;
+        BlockingQueue<byte[]> queue;
+
+        public ExpectedSizeIterator(int expectedSize) {
+            this.expectedSize = expectedSize;
+            this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return (current < expectedSize);
+        }
+
+        @Override
+        public byte[] next() {
+            if (current >= expectedSize) {
+                throw new IllegalStateException("Won't have more data");
+            }
+            try {
+                current++;
+                return queue.take();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("error when waiting queue", e);
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new NotImplementedException();
+        }
+
+        public void append(byte[] data) {
+            try {
+                queue.put(data);
+            } catch (InterruptedException e) {
+                throw new RuntimeException("error when waiting queue", e);
+            }
+        }
+    }
+
     static class EndpointResultsAsGTScanner implements IGTScanner {
         private GTInfo info;
         private Iterator<byte[]> blocks;
@@ -141,112 +182,112 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         super(cubeSeg, cuboid, fullGTInfo);
     }
 
+    private byte[] getByteArrayForShort(short v) {
+        byte[] split = new byte[Bytes.SIZEOF_SHORT];
+        BytesUtil.writeUnsigned(v, split, 0, Bytes.SIZEOF_SHORT);
+        return split;
+    }
+
+    private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short 
shardNum, int totalShards) {
+        if (baseShard + shardNum <= totalShards) {
+            //endpoint end key is inclusive, so no need to append 0 or anything
+            return 
Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), 
getByteArrayForShort((short) (baseShard + shardNum - 1))));
+        } else {
+            //0,1,2,3,4 wants 4,0
+            return 
Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), 
getByteArrayForShort((short) (totalShards - 1))),//
+                    Pair.newPair(getByteArrayForShort((short) 0), 
getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1))));
+        }
+    }
+
     @Override
-    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws 
IOException {
+    public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) 
throws IOException {
+
+        final String toggle = BackdoorToggles.getCoprocessorBehavior() == null 
? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : 
BackdoorToggles.getCoprocessorBehavior();
+        logger.debug("The execution of this query will use " + toggle + " as 
endpoint's behavior");
+
+        short cuboidBaseShard = 
cubeSeg.getCuboidBaseShard(this.cuboid.getId());
+        short shardNum = cubeSeg.getCuboidShardNum(this.cuboid.getId());
+        int totalShards = cubeSeg.getTotalShards();
+
+        final List<ByteString> scanRequestByteStrings = Lists.newArrayList();
+        final List<ByteString> rawScanByteStrings = Lists.newArrayList();
 
         // primary key (also the 0th column block) is always selected
-        final ImmutableBitSet selectedColBlocks = 
scanRequest.getSelectedColBlocks().set(0);
+        final ImmutableBitSet selectedColBlocks = 
scanRequests.get(0).getSelectedColBlocks().set(0);
+
         // globally shared connection, does not require close
-        HConnection hbaseConn = 
HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-        final HTableInterface hbaseTable = 
hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+        final HTableInterface hbaseTable = 
HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()).getTable(cubeSeg.getStorageLocationIdentifier());
 
-        List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), 
scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
-        List<List<Integer>> hbaseColumnsToGT = 
getHBaseColumnsGTMapping(selectedColBlocks);
         final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
+        List<List<Integer>> hbaseColumnsToGT = 
getHBaseColumnsGTMapping(selectedColBlocks);
         for (List<Integer> list : hbaseColumnsToGT) {
             
hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
         }
 
-        ByteBuffer buffer = 
ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-        GTScanRequest.serializer.serialize(scanRequest, buffer);
-        buffer.flip();
-        final ByteString scanRequestBytesString = 
HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
-        logger.debug("Serialized scanRequestBytes's size is " + 
(buffer.limit() - buffer.position()));
+        for (GTScanRequest req : scanRequests) {
+            ByteBuffer buffer = 
ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
+            GTScanRequest.serializer.serialize(req, buffer);
+            buffer.flip();
+            
scanRequestByteStrings.add(HBaseZeroCopyByteString.wrap(buffer.array(), 
buffer.position(), buffer.limit()));
 
-        final List<byte[]> rowBlocks = 
Collections.synchronizedList(Lists.<byte[]> newArrayList());
+            RawScan rawScan = preparedHBaseScan(req.getPkStart(), 
req.getPkEnd(), req.getFuzzyKeys(), selectedColBlocks);
 
-        logger.debug("Total RawScan range count: " + rawScans.size());
-        for (RawScan rawScan : rawScans) {
+            ByteBuffer rawScanBuffer = 
ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
+            RawScan.serializer.serialize(rawScan, rawScanBuffer);
+            rawScanBuffer.flip();
+            
rawScanByteStrings.add(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), 
rawScanBuffer.position(), rawScanBuffer.limit()));
+
+            logger.debug("Serialized scanRequestBytes {} bytes, 
rawScanBytesString {} bytes", buffer.limit() - buffer.position(), 
rawScanBuffer.limit() - rawScanBuffer.position());
             logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
         }
 
-        final AtomicInteger totalScannedCount = new AtomicInteger(0);
-        final String toggle = BackdoorToggles.getCoprocessorBehavior() == null 
? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : 
BackdoorToggles.getCoprocessorBehavior();
-        logger.debug("The execution of this query will use " + toggle + " as 
endpoint's behavior");
-        List<Future<?>> futures = Lists.newArrayList();
-
-        for (int i = 0; i < rawScans.size(); ++i) {
-            final int shardIndex = i;
-            final RawScan rawScan = rawScans.get(i);
-
-            Future<?> future = executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-
-                    ByteBuffer rawScanBuffer = 
ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-                    RawScan.serializer.serialize(rawScan, rawScanBuffer);
-                    rawScanBuffer.flip();
-
-                    CubeVisitProtos.CubeVisitRequest.Builder builder = 
CubeVisitProtos.CubeVisitRequest.newBuilder();
-                    
builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(),
 rawScanBuffer.position(), rawScanBuffer.limit()));
-                    for (IntList intList : hbaseColumnsToGTIntList) {
-                        builder.addHbaseColumnsToGT(intList);
-                    }
-                    
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
-                    builder.setBehavior(toggle);
-
-                    Collection<CubeVisitProtos.CubeVisitResponse> results;
-                    try {
-                        results = getResults(builder.build(), hbaseTable, 
rawScan.startKey, rawScan.endKey);
-                    } catch (Throwable throwable) {
-                        throw new RuntimeException("Error when visiting cubes 
by endpoint:", throwable);
-                    }
+        logger.debug("Start to executing: {} shards starting from {}", 
shardNum, cuboidBaseShard);
 
-                    //results.size() supposed to be 1;
-                    if (results.size() != 1) {
-                        logger.info("{} CubeVisitResponse returned for shard 
{}", results.size(), shardIndex);
-                    }
-
-                    for (CubeVisitProtos.CubeVisitResponse result : results) {
-                        
totalScannedCount.addAndGet(result.getStats().getScannedRowCount());
-                        logger.info(getStatsString(result, shardIndex));
-                    }
+        final AtomicInteger totalScannedCount = new AtomicInteger(0);
+        final ExpectedSizeIterator epResultItr = new 
ExpectedSizeIterator(scanRequests.size() * shardNum);
+
+        for (final Pair<byte[], byte[]> epRange : 
getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
+            for (int i = 0; i < scanRequests.size(); ++i) {
+                final int scanIndex = i;
+                executorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        CubeVisitProtos.CubeVisitRequest.Builder builder = 
CubeVisitProtos.CubeVisitRequest.newBuilder();
+                        
builder.setGtScanRequest(scanRequestByteStrings.get(scanIndex)).setHbaseRawScan(rawScanByteStrings.get(scanIndex));
+                        for (IntList intList : hbaseColumnsToGTIntList) {
+                            builder.addHbaseColumnsToGT(intList);
+                        }
+                        
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
+                        builder.setBehavior(toggle);
+
+                        Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+                        try {
+                            results = getResults(builder.build(), hbaseTable, 
epRange.getFirst(), epRange.getSecond());
+                        } catch (Throwable throwable) {
+                            throw new RuntimeException("Error when visiting 
cubes by endpoint:", throwable);
+                        }
 
-                    Collection<byte[]> part = Collections2.transform(results, 
new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
-                        @Nullable
-                        @Override
-                        public byte[] apply(CubeVisitProtos.CubeVisitResponse 
input) {
+                        for (Map.Entry<byte[], 
CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
+                            
totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
+                            logger.info(getStatsString(result));
                             try {
-                                return 
CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows()));
+                                
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
                             } catch (IOException | DataFormatException e) {
-                                throw new RuntimeException(e);
+                                throw new RuntimeException("Error when 
decompressing", e);
                             }
                         }
-                    });
-                    rowBlocks.addAll(part);
-                }
-            });
-            futures.add(future);
-        }
-        try {
-            for (Future<?> future : futures) {
-                future.get(1, TimeUnit.HOURS);
+                    }
+                });
             }
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Visiting cube by endpoint gets 
interrupted", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Visiting cube throw exception", e);
-        } catch (TimeoutException e) {
-            throw new RuntimeException("Visiting cube by endpoint timeout", e);
         }
 
-        return new EndpointResultsAsGTScanner(fullGTInfo, 
rowBlocks.iterator(), scanRequest.getColumns(), totalScannedCount.get());
+        return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, 
scanRequests.get(0).getColumns(), totalScannedCount.get());
     }
 
-    private String getStatsString(CubeVisitProtos.CubeVisitResponse result, 
int shardIndex) {
+    private String getStatsString(Map.Entry<byte[], 
CubeVisitProtos.CubeVisitResponse> result) {
         StringBuilder sb = new StringBuilder();
-        Stats stats = result.getStats();
-        sb.append("Shard " + shardIndex + " on host: " + stats.getHostname() + 
".");
+        Stats stats = result.getValue().getStats();
+        sb.append("Shard " + BytesUtil.toHex(result.getKey()) + " on host: " + 
stats.getHostname() + ".");
         sb.append("Total scanned row: " + stats.getScannedRowCount() + ". ");
         sb.append("Total filtered/aggred row: " + 
stats.getAggregatedRowCount() + ". ");
         sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - 
stats.getServiceStartTime()) + "(ms). ");
@@ -256,7 +297,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     }
 
-    private Collection<CubeVisitProtos.CubeVisitResponse> getResults(final 
CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] 
startKey, byte[] endKey) throws Throwable {
+    private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final 
CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] 
startKey, byte[] endKey) throws Throwable {
         Map<byte[], CubeVisitProtos.CubeVisitResponse> results = 
table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, 
endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, 
CubeVisitProtos.CubeVisitResponse>() {
             public CubeVisitProtos.CubeVisitResponse 
call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
                 ServerRpcController controller = new ServerRpcController();
@@ -270,6 +311,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             }
         });
 
-        return results.values();
+        return results;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index c25197f..07143b1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -5,8 +5,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import javax.annotation.Nullable;
-
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -32,7 +30,6 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -55,7 +52,7 @@ public abstract class CubeHBaseRPC {
         this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
     }
 
-    abstract IGTScanner getGTScanner(GTScanRequest scanRequest) throws 
IOException;
+    abstract IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) 
throws IOException;
 
     public static Scan buildScan(RawScan rawScan) {
         Scan scan = new Scan();
@@ -80,34 +77,22 @@ public abstract class CubeHBaseRPC {
         return scan;
     }
 
-    protected List<RawScan> preparedHBaseScan(GTRecord pkStart, GTRecord 
pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+    protected RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, 
List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
         final List<Pair<byte[], byte[]>> selectedColumns = 
makeHBaseColumns(selectedColBlocks);
-        List<RawScan> ret = Lists.newArrayList();
 
         LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
         byte[] start = encoder.createBuf();
         byte[] end = encoder.createBuf();
-        List<byte[]> startKeys;
-        List<byte[]> endKeys;
 
         encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
         encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start);
-        startKeys = encoder.getRowKeysDifferentShards(start);
 
         encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
         encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end);
-        endKeys = encoder.getRowKeysDifferentShards(end);
-        endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() {
-            @Nullable
-            @Override
-            public byte[] apply(byte[] input) {
-                byte[] shardEnd = new byte[input.length + 1];//append extra 0 
to the end key to make it inclusive while scanning
-                System.arraycopy(input, 0, shardEnd, 0, input.length);
-                return shardEnd;
-            }
-        });
+        byte[] temp = new byte[end.length + 1];//append extra 0 to the end key 
to make it inclusive while scanning
+        System.arraycopy(end, 0, temp, 0, end.length);
+        end = temp;
 
-        Preconditions.checkState(startKeys.size() == endKeys.size());
         List<Pair<byte[], byte[]>> hbaseFuzzyKeys = 
translateFuzzyKeys(fuzzyKeys);
 
         KylinConfig config = cubeSeg.getCubeDesc().getConfig();
@@ -116,11 +101,7 @@ public abstract class CubeHBaseRPC {
         if (isMemoryHungry(selectedColBlocks))
             hbaseCaching /= 10;
 
-        for (short i = 0; i < startKeys.size(); ++i) {
-            ret.add(new RawScan(startKeys.get(i), endKeys.get(i), 
selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize));
-        }
-        return ret;
-
+        return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, 
hbaseCaching, hbaseMaxResultSize);
     }
 
     private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index b623a65..90888a3 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -1,152 +1,152 @@
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.gridtable.IGTStore;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-/**
- * for test use only
- */
-public class CubeHBaseScanRPC extends CubeHBaseRPC {
-    public static final Logger logger = 
LoggerFactory.getLogger(CubeHBaseScanRPC.class);
-
-    static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> {
-
-        private final GTInfo info;
-        private final Iterator<GTRecord> input;
-
-        public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> 
input) {
-            this.info = info;
-            this.input = input;
-        }
-
-        @Override
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-                @Override
-                public boolean hasNext() {
-                    return input.hasNext();
-                }
-
-                @Override
-                public GTRecord next() {
-                    GTRecord x = input.next();
-                    return new GTRecord(info, x.getInternal());
-                }
-
-                @Override
-                public void remove() {
-
-                }
-            };
-        }
-    }
-
-    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo 
fullGTInfo) {
-        super(cubeSeg, cuboid, fullGTInfo);
-    }
-
-    @Override
-    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws 
IOException {
-
-        // primary key (also the 0th column block) is always selected
-        final ImmutableBitSet selectedColBlocks = 
scanRequest.getSelectedColBlocks().set(0);
-        // globally shared connection, does not require close
-        HConnection hbaseConn = 
HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-        final HTableInterface hbaseTable = 
hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
-
-        List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), 
scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
-        List<List<Integer>> hbaseColumnsToGT = 
getHBaseColumnsGTMapping(selectedColBlocks);
-
-        final List<ResultScanner> scanners = Lists.newArrayList();
-        final List<Iterator<Result>> resultIterators = Lists.newArrayList();
-
-        for (RawScan rawScan : rawScans) {
-
-            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
-            Scan hbaseScan = buildScan(rawScan);
-
-            final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
-            final Iterator<Result> iterator = scanner.iterator();
-
-            scanners.add(scanner);
-            resultIterators.add(iterator);
-        }
-
-        final Iterator<Result> allResultsIterator = 
Iterators.concat(resultIterators.iterator());
-
-        CellListIterator cellListIterator = new CellListIterator() {
-            @Override
-            public void close() throws IOException {
-                for (ResultScanner scanner : scanners) {
-                    scanner.close();
-                }
-                hbaseTable.close();
-            }
-
-            @Override
-            public boolean hasNext() {
-                return allResultsIterator.hasNext();
-            }
-
-            @Override
-            public List<Cell> next() {
-                return allResultsIterator.next().listCells();
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-
-        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, 
rawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
cubeSeg.getRowKeyPreambleSize());
-        IGTScanner rawScanner = store.scan(scanRequest);
-
-        final IGTScanner decorateScanner = 
scanRequest.decorateScanner(rawScanner);
-        final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new 
TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
-
-        return new IGTScanner() {
-            @Override
-            public GTInfo getInfo() {
-                return fullGTInfo;
-            }
-
-            @Override
-            public int getScannedRowCount() {
-                return decorateScanner.getScannedRowCount();
-            }
-
-            @Override
-            public void close() throws IOException {
-                decorateScanner.close();
-            }
-
-            @Override
-            public Iterator<GTRecord> iterator() {
-                return trimmedInfoGTRecordAdapter.iterator();
-            }
-        };
-    }
-}
+//package org.apache.kylin.storage.hbase.cube.v2;
+//
+//import java.io.IOException;
+//import java.util.Iterator;
+//import java.util.List;
+//
+//import org.apache.hadoop.hbase.Cell;
+//import org.apache.hadoop.hbase.client.HConnection;
+//import org.apache.hadoop.hbase.client.HTableInterface;
+//import org.apache.hadoop.hbase.client.Result;
+//import org.apache.hadoop.hbase.client.ResultScanner;
+//import org.apache.hadoop.hbase.client.Scan;
+//import org.apache.kylin.common.util.ImmutableBitSet;
+//import org.apache.kylin.cube.CubeSegment;
+//import org.apache.kylin.cube.cuboid.Cuboid;
+//import org.apache.kylin.gridtable.GTInfo;
+//import org.apache.kylin.gridtable.GTRecord;
+//import org.apache.kylin.gridtable.GTScanRequest;
+//import org.apache.kylin.gridtable.IGTScanner;
+//import org.apache.kylin.gridtable.IGTStore;
+//import org.apache.kylin.storage.hbase.HBaseConnection;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import com.google.common.collect.Iterators;
+//import com.google.common.collect.Lists;
+//
+///**
+// * for test use only
+// */
+//public class CubeHBaseScanRPC extends CubeHBaseRPC {
+//    public static final Logger logger = 
LoggerFactory.getLogger(CubeHBaseScanRPC.class);
+//
+//    static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> {
+//
+//        private final GTInfo info;
+//        private final Iterator<GTRecord> input;
+//
+//        public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> 
input) {
+//            this.info = info;
+//            this.input = input;
+//        }
+//
+//        @Override
+//        public Iterator<GTRecord> iterator() {
+//            return new Iterator<GTRecord>() {
+//                @Override
+//                public boolean hasNext() {
+//                    return input.hasNext();
+//                }
+//
+//                @Override
+//                public GTRecord next() {
+//                    GTRecord x = input.next();
+//                    return new GTRecord(info, x.getInternal());
+//                }
+//
+//                @Override
+//                public void remove() {
+//
+//                }
+//            };
+//        }
+//    }
+//
+//    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo 
fullGTInfo) {
+//        super(cubeSeg, cuboid, fullGTInfo);
+//    }
+//
+//    @Override
+//    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws 
IOException {
+//
+//        // primary key (also the 0th column block) is always selected
+//        final ImmutableBitSet selectedColBlocks = 
scanRequest.getSelectedColBlocks().set(0);
+//        // globally shared connection, does not require close
+//        HConnection hbaseConn = 
HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+//        final HTableInterface hbaseTable = 
hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+//
+//        List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), 
scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+//        List<List<Integer>> hbaseColumnsToGT = 
getHBaseColumnsGTMapping(selectedColBlocks);
+//
+//        final List<ResultScanner> scanners = Lists.newArrayList();
+//        final List<Iterator<Result>> resultIterators = Lists.newArrayList();
+//
+//        for (RawScan rawScan : rawScans) {
+//
+//            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+//            Scan hbaseScan = buildScan(rawScan);
+//
+//            final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+//            final Iterator<Result> iterator = scanner.iterator();
+//
+//            scanners.add(scanner);
+//            resultIterators.add(iterator);
+//        }
+//
+//        final Iterator<Result> allResultsIterator = 
Iterators.concat(resultIterators.iterator());
+//
+//        CellListIterator cellListIterator = new CellListIterator() {
+//            @Override
+//            public void close() throws IOException {
+//                for (ResultScanner scanner : scanners) {
+//                    scanner.close();
+//                }
+//                hbaseTable.close();
+//            }
+//
+//            @Override
+//            public boolean hasNext() {
+//                return allResultsIterator.hasNext();
+//            }
+//
+//            @Override
+//            public List<Cell> next() {
+//                return allResultsIterator.next().listCells();
+//            }
+//
+//            @Override
+//            public void remove() {
+//                throw new UnsupportedOperationException();
+//            }
+//        };
+//
+//        IGTStore store = new HBaseReadonlyStore(cellListIterator, 
scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
cubeSeg.getRowKeyPreambleSize());
+//        IGTScanner rawScanner = store.scan(scanRequest);
+//
+//        final IGTScanner decorateScanner = 
scanRequest.decorateScanner(rawScanner);
+//        final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new 
TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
+//
+//        return new IGTScanner() {
+//            @Override
+//            public GTInfo getInfo() {
+//                return fullGTInfo;
+//            }
+//
+//            @Override
+//            public int getScannedRowCount() {
+//                return decorateScanner.getScannedRowCount();
+//            }
+//
+//            @Override
+//            public void close() throws IOException {
+//                decorateScanner.close();
+//            }
+//
+//            @Override
+//            public Iterator<GTRecord> iterator() {
+//                return trimmedInfoGTRecordAdapter.iterator();
+//            }
+//        };
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 246f1c4..5798f41 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -8,7 +8,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
@@ -25,6 +24,7 @@ import 
org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.NotEnoughGTInfoException;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.TupleFilterDictionaryTranslater;
+import org.apache.kylin.gridtable.EmptyGTScanner;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRange;
@@ -83,9 +83,9 @@ public class CubeSegmentScanner implements IGTScanner {
             int index = mapping.getIndexOf(tblColRef);
             if (index >= 0) {
                 segmentStartAndEnd = getSegmentStartAndEnd(index);
-                partitionColOfGT =  info.colRef(index);
+                partitionColOfGT = info.colRef(index);
             }
-            scanRangePlanner = new GTScanRangePlanner(info, 
segmentStartAndEnd,partitionColOfGT);
+            scanRangePlanner = new GTScanRangePlanner(info, 
segmentStartAndEnd, partitionColOfGT);
         } else {
             scanRangePlanner = new GTScanRangePlanner(info, null, null);
         }
@@ -220,90 +220,39 @@ public class CubeSegmentScanner implements IGTScanner {
     }
 
     private class Scanner {
-        final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
-        int cur = 0;
-        Iterator<GTRecord> curIterator = null;
-        GTRecord next = null;
+        IGTScanner internal = null;
 
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-
-                @Override
-                public boolean hasNext() {
-                    if (next != null)
-                        return true;
-
-                    if (curIterator == null) {
-                        if (cur >= scanRequests.size())
-                            return false;
-
-                        try {
-
-                            CubeHBaseRPC rpc;
-                            if 
("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
-                                rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, 
info);
-                            } else {
-                                rpc = new CubeHBaseEndpointRPC(cubeSeg, 
cuboid, info);//default behavior
-                            }
-
-                            //change previous line to CubeHBaseRPC rpc = new 
CubeHBaseScanRPC(cubeSeg, cuboid, info);
-                            //to debug locally
-
-                            inputScanners[cur] = 
rpc.getGTScanner(scanRequests.get(cur));
-                            curIterator = inputScanners[cur].iterator();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-
-                    if (curIterator.hasNext() == false) {
-                        curIterator = null;
-                        cur++;
-                        return hasNext();
-                    }
-
-                    next = curIterator.next();
-                    return true;
-                }
-
-                @Override
-                public GTRecord next() {
-                    // fetch next record
-                    if (next == null) {
-                        hasNext();
-                        if (next == null)
-                            throw new NoSuchElementException();
-                    }
-
-                    GTRecord result = next;
-                    next = null;
-                    return result;
+        public Scanner() {
+            CubeHBaseRPC rpc;
+            if 
("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
+                rpc = null;
+            } else {
+                rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, 
info);//default behavior
+            }
+            //change previous line to CubeHBaseRPC rpc = new 
CubeHBaseScanRPC(cubeSeg, cuboid, info);
+            //to debug locally
+
+            try {
+                if (scanRequests.size() == 0) {
+                    internal = new EmptyGTScanner();
+                } else {
+                    internal = rpc.getGTScanner(scanRequests);
                 }
+            } catch (IOException e) {
+                throw new RuntimeException("error", e);
+            }
+        }
 
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
+        public Iterator<GTRecord> iterator() {
+            return internal.iterator();
         }
 
         public void close() throws IOException {
-            for (int i = 0; i < inputScanners.length; i++) {
-                if (inputScanners[i] != null) {
-                    inputScanners[i].close();
-                }
-            }
+            internal.close();
         }
 
         public int getScannedRowCount() {
-            int result = 0;
-            for (int i = 0; i < inputScanners.length; i++) {
-                if (inputScanners[i] == null)
-                    break;
-
-                result += inputScanners[i].getScannedRowCount();
-            }
-            return result;
+            return internal.getScannedRowCount();
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 78bd25b..9f0ff2b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -99,6 +99,7 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
             try {
                 scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, 
groupsD, metrics, filterD, !isExactAggregation);
             } catch (NotEnoughGTInfoException e) {
+                //deal with empty cube segment
                 logger.info("Cannot construct Segment {}'s GTInfo, this may 
due to empty segment or broken metadata", cubeSeg);
                 continue;
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index e396a76..7ac4be1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -36,6 +37,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.gridtable.GTRecord;
@@ -123,6 +125,20 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
         }
     }
 
+    private void updateRawScanByCurrentRegion(RawScan rawScan, HRegion region, 
int shardLength) {
+        if (shardLength == 0) {
+            return;
+        }
+        byte[] regionStartKey = ArrayUtils.isEmpty(region.getStartKey()) ? new 
byte[shardLength] : region.getStartKey();
+        Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength);
+        Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
+    }
+
+    private void appendProfileInfo(StringBuilder sb) {
+        sb.append(System.currentTimeMillis() - this.serviceStartTime);
+        sb.append(",");
+    }
+
     @Override
     public void visitCube(RpcController controller, 
CubeVisitProtos.CubeVisitRequest request, 
RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
         RegionScanner innerScanner = null;
@@ -134,20 +150,25 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
         try {
             this.serviceStartTime = System.currentTimeMillis();
 
+            region = env.getRegion();
+            region.startRegionOperation();
+
             GTScanRequest scanReq = 
GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
             RawScan hbaseRawScan = 
RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
                 hbaseColumnsToGT.add(intList.getIntsList());
             }
 
+            if (request.getRowkeyPreambleSize() - 
RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
+                //if has shard, fill region shard to raw scan start/end
+                updateRawScanByCurrentRegion(hbaseRawScan, region, 
request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
+            }
+            
             Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
 
-            sb.append(System.currentTimeMillis() - this.serviceStartTime);
-            sb.append(",");
-
-            region = env.getRegion();
-            region.startRegionOperation();
+            appendProfileInfo(sb);
 
             innerScanner = region.getScanner(scan);
             CoprocessorBehavior behavior = 
CoprocessorBehavior.valueOf(request.getBehavior());
@@ -177,23 +198,20 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 finalRowCount++;
             }
 
-            sb.append(System.currentTimeMillis() - this.serviceStartTime);
-            sb.append(",");
+            appendProfileInfo(sb);
 
             //outputStream.close() is not necessary
             allRows = outputStream.toByteArray();
             byte[] compressedAllRows = CompressionUtils.compress(allRows);
 
-            sb.append(System.currentTimeMillis() - this.serviceStartTime);
-            sb.append(",");
+            appendProfileInfo(sb);
 
             OperatingSystemMXBean operatingSystemMXBean = 
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
             double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
             double freePhysicalMemorySize = 
operatingSystemMXBean.getFreePhysicalMemorySize();
             double freeSwapSpaceSize = 
operatingSystemMXBean.getFreeSwapSpaceSize();
 
-            sb.append(System.currentTimeMillis() - this.serviceStartTime);
-            sb.append(",");
+            appendProfileInfo(sb);
 
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = 
CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//

Reply via email to