KYLIN-1308 maintain CubeHBaseScanRPC

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bc8ea781
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bc8ea781
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bc8ea781

Branch: refs/heads/2.x-staging
Commit: bc8ea78171856e6c438b1a5da5ed9e2b1a49a862
Parents: fe19e3d
Author: honma <ho...@ebay.com>
Authored: Wed Jan 13 17:42:54 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jan 13 17:54:04 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/cube/kv/LazyRowKeyEncoder.java |  26 ++
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |  43 +++
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 348 +++++++++++--------
 .../hbase/cube/v2/CubeSegmentScanner.java       |   2 +-
 4 files changed, 266 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
index 0fca726..c93f65e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
@@ -18,9 +18,15 @@
 
 package org.apache.kylin.cube.kv;
 
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * A LazyRowKeyEncoder will not try to calculate shard
  * It works for both enableSharding or non-enableSharding scenario
@@ -38,4 +44,24 @@ public class LazyRowKeyEncoder extends RowKeyEncoder {
             throw new RuntimeException("If enableSharding false, you should 
never calculate shard");
         }
     }
+
+
+    //for non-sharding cases it will only return one byte[] with not shard at 
beginning
+    public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
+        final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+
+        if (!enableSharding) {
+            return Lists.newArrayList(halfCookedKey);//not shard to append at 
head, so it is already well cooked
+        } else {
+            List<byte[]> ret = Lists.newArrayList();
+            for (short i = 0; i < cuboidShardNum; ++i) {
+                short shard = 
ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, 
cubeSeg.getTotalShards());
+                byte[] cookedKey = Arrays.copyOf(halfCookedKey, 
halfCookedKey.length);
+                BytesUtil.writeShort(shard, cookedKey, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
+                ret.add(cookedKey);
+            }
+            return ret;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 07143b1..db39455 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -104,6 +105,48 @@ public abstract class CubeHBaseRPC {
         return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, 
hbaseCaching, hbaseMaxResultSize);
     }
 
+    protected List<RawScan> preparedHBaseScans(GTRecord pkStart, GTRecord 
pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+        final List<Pair<byte[], byte[]>> selectedColumns = 
makeHBaseColumns(selectedColBlocks);
+        List<RawScan> ret = Lists.newArrayList();
+
+        LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
+        byte[] start = encoder.createBuf();
+        byte[] end = encoder.createBuf();
+        List<byte[]> startKeys;
+        List<byte[]> endKeys;
+
+        encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
+        encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start);
+        startKeys = encoder.getRowKeysDifferentShards(start);
+
+        encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
+        encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end);
+        endKeys = encoder.getRowKeysDifferentShards(end);
+        endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() {
+            @Override
+            public byte[] apply(byte[] input) {
+                byte[] shardEnd = new byte[input.length + 1];//append extra 0 
to the end key to make it inclusive while scanning
+                System.arraycopy(input, 0, shardEnd, 0, input.length);
+                return shardEnd;
+            }
+        });
+
+        Preconditions.checkState(startKeys.size() == endKeys.size());
+        List<Pair<byte[], byte[]>> hbaseFuzzyKeys = 
translateFuzzyKeys(fuzzyKeys);
+
+        KylinConfig config = cubeSeg.getCubeDesc().getConfig();
+        int hbaseCaching = config.getHBaseScanCacheRows();
+        int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
+        if (isMemoryHungry(selectedColBlocks))
+            hbaseCaching /= 10;
+
+        for (short i = 0; i < startKeys.size(); ++i) {
+            ret.add(new RawScan(startKeys.get(i), endKeys.get(i), 
selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize));
+        }
+        return ret;
+
+    }
+
     private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {
         ImmutableBitSet selectColumns = 
fullGTInfo.selectColumns(selectedColBlocks);
         return fullGTInfo.getMaxColumnLength(selectColumns) > 1024;

http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 90888a3..fa32769 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -1,152 +1,196 @@
-//package org.apache.kylin.storage.hbase.cube.v2;
-//
-//import java.io.IOException;
-//import java.util.Iterator;
-//import java.util.List;
-//
-//import org.apache.hadoop.hbase.Cell;
-//import org.apache.hadoop.hbase.client.HConnection;
-//import org.apache.hadoop.hbase.client.HTableInterface;
-//import org.apache.hadoop.hbase.client.Result;
-//import org.apache.hadoop.hbase.client.ResultScanner;
-//import org.apache.hadoop.hbase.client.Scan;
-//import org.apache.kylin.common.util.ImmutableBitSet;
-//import org.apache.kylin.cube.CubeSegment;
-//import org.apache.kylin.cube.cuboid.Cuboid;
-//import org.apache.kylin.gridtable.GTInfo;
-//import org.apache.kylin.gridtable.GTRecord;
-//import org.apache.kylin.gridtable.GTScanRequest;
-//import org.apache.kylin.gridtable.IGTScanner;
-//import org.apache.kylin.gridtable.IGTStore;
-//import org.apache.kylin.storage.hbase.HBaseConnection;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.google.common.collect.Iterators;
-//import com.google.common.collect.Lists;
-//
-///**
-// * for test use only
-// */
-//public class CubeHBaseScanRPC extends CubeHBaseRPC {
-//    public static final Logger logger = 
LoggerFactory.getLogger(CubeHBaseScanRPC.class);
-//
-//    static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> {
-//
-//        private final GTInfo info;
-//        private final Iterator<GTRecord> input;
-//
-//        public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> 
input) {
-//            this.info = info;
-//            this.input = input;
-//        }
-//
-//        @Override
-//        public Iterator<GTRecord> iterator() {
-//            return new Iterator<GTRecord>() {
-//                @Override
-//                public boolean hasNext() {
-//                    return input.hasNext();
-//                }
-//
-//                @Override
-//                public GTRecord next() {
-//                    GTRecord x = input.next();
-//                    return new GTRecord(info, x.getInternal());
-//                }
-//
-//                @Override
-//                public void remove() {
-//
-//                }
-//            };
-//        }
-//    }
-//
-//    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo 
fullGTInfo) {
-//        super(cubeSeg, cuboid, fullGTInfo);
-//    }
-//
-//    @Override
-//    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws 
IOException {
-//
-//        // primary key (also the 0th column block) is always selected
-//        final ImmutableBitSet selectedColBlocks = 
scanRequest.getSelectedColBlocks().set(0);
-//        // globally shared connection, does not require close
-//        HConnection hbaseConn = 
HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-//        final HTableInterface hbaseTable = 
hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
-//
-//        List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), 
scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
-//        List<List<Integer>> hbaseColumnsToGT = 
getHBaseColumnsGTMapping(selectedColBlocks);
-//
-//        final List<ResultScanner> scanners = Lists.newArrayList();
-//        final List<Iterator<Result>> resultIterators = Lists.newArrayList();
-//
-//        for (RawScan rawScan : rawScans) {
-//
-//            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
-//            Scan hbaseScan = buildScan(rawScan);
-//
-//            final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
-//            final Iterator<Result> iterator = scanner.iterator();
-//
-//            scanners.add(scanner);
-//            resultIterators.add(iterator);
-//        }
-//
-//        final Iterator<Result> allResultsIterator = 
Iterators.concat(resultIterators.iterator());
-//
-//        CellListIterator cellListIterator = new CellListIterator() {
-//            @Override
-//            public void close() throws IOException {
-//                for (ResultScanner scanner : scanners) {
-//                    scanner.close();
-//                }
-//                hbaseTable.close();
-//            }
-//
-//            @Override
-//            public boolean hasNext() {
-//                return allResultsIterator.hasNext();
-//            }
-//
-//            @Override
-//            public List<Cell> next() {
-//                return allResultsIterator.next().listCells();
-//            }
-//
-//            @Override
-//            public void remove() {
-//                throw new UnsupportedOperationException();
-//            }
-//        };
-//
-//        IGTStore store = new HBaseReadonlyStore(cellListIterator, 
scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
cubeSeg.getRowKeyPreambleSize());
-//        IGTScanner rawScanner = store.scan(scanRequest);
-//
-//        final IGTScanner decorateScanner = 
scanRequest.decorateScanner(rawScanner);
-//        final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new 
TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
-//
-//        return new IGTScanner() {
-//            @Override
-//            public GTInfo getInfo() {
-//                return fullGTInfo;
-//            }
-//
-//            @Override
-//            public int getScannedRowCount() {
-//                return decorateScanner.getScannedRowCount();
-//            }
-//
-//            @Override
-//            public void close() throws IOException {
-//                decorateScanner.close();
-//            }
-//
-//            @Override
-//            public Iterator<GTRecord> iterator() {
-//                return trimmedInfoGTRecordAdapter.iterator();
-//            }
-//        };
-//    }
-//}
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+/**
+ * for test use only
+ */
+public class CubeHBaseScanRPC extends CubeHBaseRPC {
+    public static final Logger logger = 
LoggerFactory.getLogger(CubeHBaseScanRPC.class);
+
+    static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> {
+
+        private final GTInfo info;
+        private final Iterator<GTRecord> input;
+
+        public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> 
input) {
+            this.info = info;
+            this.input = input;
+        }
+
+        @Override
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+                @Override
+                public boolean hasNext() {
+                    return input.hasNext();
+                }
+
+                @Override
+                public GTRecord next() {
+                    GTRecord x = input.next();
+                    return new GTRecord(info, x.getInternal());
+                }
+
+                @Override
+                public void remove() {
+
+                }
+            };
+        }
+    }
+
+    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo 
fullGTInfo) {
+        super(cubeSeg, cuboid, fullGTInfo);
+    }
+
+    @Override
+    public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) 
throws IOException {
+        final List<IGTScanner> scanners = Lists.newArrayList();
+        for (GTScanRequest request : scanRequests) {
+            scanners.add(getGTScanner(request));
+        }
+
+        return new IGTScanner() {
+            @Override
+            public GTInfo getInfo() {
+                return scanners.get(0).getInfo();
+            }
+
+            @Override
+            public int getScannedRowCount() {
+                int sum = 0;
+                for (IGTScanner s : scanners) {
+                    sum += s.getScannedRowCount();
+                }
+                return sum;
+            }
+
+            @Override
+            public void close() throws IOException {
+                for (IGTScanner s : scanners) {
+                    s.close();
+                }
+            }
+
+            @Override
+            public Iterator<GTRecord> iterator() {
+                return 
Iterators.concat(Iterators.transform(scanners.iterator(), new 
Function<IGTScanner, Iterator<GTRecord>>() {
+                    @Nullable
+                    @Override
+                    public Iterator<GTRecord> apply(IGTScanner input) {
+                        return input.iterator();
+                    }
+                }));
+            }
+        };
+    }
+
+    private IGTScanner getGTScanner(final GTScanRequest scanRequest) throws 
IOException {
+
+        // primary key (also the 0th column block) is always selected
+        final ImmutableBitSet selectedColBlocks = 
scanRequest.getSelectedColBlocks().set(0);
+        // globally shared connection, does not require close
+        HConnection hbaseConn = 
HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+        final HTableInterface hbaseTable = 
hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+
+        List<RawScan> rawScans = preparedHBaseScans(scanRequest.getPkStart(), 
scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+        List<List<Integer>> hbaseColumnsToGT = 
getHBaseColumnsGTMapping(selectedColBlocks);
+
+        final List<ResultScanner> scanners = Lists.newArrayList();
+        final List<Iterator<Result>> resultIterators = Lists.newArrayList();
+
+        for (RawScan rawScan : rawScans) {
+
+            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+            Scan hbaseScan = buildScan(rawScan);
+
+            final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+            final Iterator<Result> iterator = scanner.iterator();
+
+            scanners.add(scanner);
+            resultIterators.add(iterator);
+        }
+
+        final Iterator<Result> allResultsIterator = 
Iterators.concat(resultIterators.iterator());
+
+        CellListIterator cellListIterator = new CellListIterator() {
+            @Override
+            public void close() throws IOException {
+                for (ResultScanner scanner : scanners) {
+                    scanner.close();
+                }
+                hbaseTable.close();
+            }
+
+            @Override
+            public boolean hasNext() {
+                return allResultsIterator.hasNext();
+            }
+
+            @Override
+            public List<Cell> next() {
+                return allResultsIterator.next().listCells();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+
+        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, 
rawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
cubeSeg.getRowKeyPreambleSize());
+        IGTScanner rawScanner = store.scan(scanRequest);
+
+        final IGTScanner decorateScanner = 
scanRequest.decorateScanner(rawScanner);
+        final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new 
TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
+
+        return new IGTScanner() {
+            @Override
+            public GTInfo getInfo() {
+                return fullGTInfo;
+            }
+
+            @Override
+            public int getScannedRowCount() {
+                return decorateScanner.getScannedRowCount();
+            }
+
+            @Override
+            public void close() throws IOException {
+                decorateScanner.close();
+            }
+
+            @Override
+            public Iterator<GTRecord> iterator() {
+                return trimmedInfoGTRecordAdapter.iterator();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 5798f41..ad0cb32 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -225,7 +225,7 @@ public class CubeSegmentScanner implements IGTScanner {
         public Scanner() {
             CubeHBaseRPC rpc;
             if 
("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
-                rpc = null;
+                rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
             } else {
                 rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, 
info);//default behavior
             }

Reply via email to