KYLIN-1308 maintain CubeHBaseScanRPC
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bc8ea781 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bc8ea781 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bc8ea781 Branch: refs/heads/2.x-staging Commit: bc8ea78171856e6c438b1a5da5ed9e2b1a49a862 Parents: fe19e3d Author: honma <ho...@ebay.com> Authored: Wed Jan 13 17:42:54 2016 +0800 Committer: honma <ho...@ebay.com> Committed: Wed Jan 13 17:54:04 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/cube/kv/LazyRowKeyEncoder.java | 26 ++ .../storage/hbase/cube/v2/CubeHBaseRPC.java | 43 +++ .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 348 +++++++++++-------- .../hbase/cube/v2/CubeSegmentScanner.java | 2 +- 4 files changed, 266 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java index 0fca726..c93f65e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java @@ -18,9 +18,15 @@ package org.apache.kylin.cube.kv; +import com.google.common.collect.Lists; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; +import java.util.Arrays; +import java.util.List; + /** * A LazyRowKeyEncoder will not try to calculate shard * It works for both enableSharding or non-enableSharding scenario @@ -38,4 +44,24 @@ public class LazyRowKeyEncoder extends RowKeyEncoder { throw new RuntimeException("If enableSharding false, you should never calculate shard"); } } + + + //for non-sharding cases it will only return one byte[] with not shard at beginning + public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) { + final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); + + if (!enableSharding) { + return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked + } else { + List<byte[]> ret = Lists.newArrayList(); + for (short i = 0; i < cuboidShardNum; ++i) { + short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); + byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length); + BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); + ret.add(cookedKey); + } + return ret; + } + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 07143b1..db39455 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.IGTScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -104,6 +105,48 @@ public abstract class CubeHBaseRPC { return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize); } + protected List<RawScan> preparedHBaseScans(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { + final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); + List<RawScan> ret = Lists.newArrayList(); + + LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid); + byte[] start = encoder.createBuf(); + byte[] end = encoder.createBuf(); + List<byte[]> startKeys; + List<byte[]> endKeys; + + encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE); + encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start); + startKeys = encoder.getRowKeysDifferentShards(start); + + encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE); + encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end); + endKeys = encoder.getRowKeysDifferentShards(end); + endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() { + @Override + public byte[] apply(byte[] input) { + byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning + System.arraycopy(input, 0, shardEnd, 0, input.length); + return shardEnd; + } + }); + + Preconditions.checkState(startKeys.size() == endKeys.size()); + List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys); + + KylinConfig config = cubeSeg.getCubeDesc().getConfig(); + int hbaseCaching = config.getHBaseScanCacheRows(); + int hbaseMaxResultSize = config.getHBaseScanMaxResultSize(); + if (isMemoryHungry(selectedColBlocks)) + hbaseCaching /= 10; + + for (short i = 0; i < startKeys.size(); ++i) { + ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize)); + } + return ret; + + } + private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) { ImmutableBitSet selectColumns = fullGTInfo.selectColumns(selectedColBlocks); return fullGTInfo.getMaxColumnLength(selectColumns) > 1024; http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index 90888a3..fa32769 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -1,152 +1,196 @@ -//package org.apache.kylin.storage.hbase.cube.v2; -// -//import java.io.IOException; -//import java.util.Iterator; -//import java.util.List; -// -//import org.apache.hadoop.hbase.Cell; -//import org.apache.hadoop.hbase.client.HConnection; -//import org.apache.hadoop.hbase.client.HTableInterface; -//import org.apache.hadoop.hbase.client.Result; -//import org.apache.hadoop.hbase.client.ResultScanner; -//import org.apache.hadoop.hbase.client.Scan; -//import org.apache.kylin.common.util.ImmutableBitSet; -//import org.apache.kylin.cube.CubeSegment; -//import org.apache.kylin.cube.cuboid.Cuboid; -//import org.apache.kylin.gridtable.GTInfo; -//import org.apache.kylin.gridtable.GTRecord; -//import org.apache.kylin.gridtable.GTScanRequest; -//import org.apache.kylin.gridtable.IGTScanner; -//import org.apache.kylin.gridtable.IGTStore; -//import org.apache.kylin.storage.hbase.HBaseConnection; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import com.google.common.collect.Iterators; -//import com.google.common.collect.Lists; -// -///** -// * for test use only -// */ -//public class CubeHBaseScanRPC extends CubeHBaseRPC { -// public static final Logger logger = LoggerFactory.getLogger(CubeHBaseScanRPC.class); -// -// static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> { -// -// private final GTInfo info; -// private final Iterator<GTRecord> input; -// -// public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) { -// this.info = info; -// this.input = input; -// } -// -// @Override -// public Iterator<GTRecord> iterator() { -// return new Iterator<GTRecord>() { -// @Override -// public boolean hasNext() { -// return input.hasNext(); -// } -// -// @Override -// public GTRecord next() { -// GTRecord x = input.next(); -// return new GTRecord(info, x.getInternal()); -// } -// -// @Override -// public void remove() { -// -// } -// }; -// } -// } -// -// public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { -// super(cubeSeg, cuboid, fullGTInfo); -// } -// -// @Override -// public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { -// -// // primary key (also the 0th column block) is always selected -// final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); -// // globally shared connection, does not require close -// HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); -// final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); -// -// List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); -// List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); -// -// final List<ResultScanner> scanners = Lists.newArrayList(); -// final List<Iterator<Result>> resultIterators = Lists.newArrayList(); -// -// for (RawScan rawScan : rawScans) { -// -// logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); -// Scan hbaseScan = buildScan(rawScan); -// -// final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); -// final Iterator<Result> iterator = scanner.iterator(); -// -// scanners.add(scanner); -// resultIterators.add(iterator); -// } -// -// final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator()); -// -// CellListIterator cellListIterator = new CellListIterator() { -// @Override -// public void close() throws IOException { -// for (ResultScanner scanner : scanners) { -// scanner.close(); -// } -// hbaseTable.close(); -// } -// -// @Override -// public boolean hasNext() { -// return allResultsIterator.hasNext(); -// } -// -// @Override -// public List<Cell> next() { -// return allResultsIterator.next().listCells(); -// } -// -// @Override -// public void remove() { -// throw new UnsupportedOperationException(); -// } -// }; -// -// IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize()); -// IGTScanner rawScanner = store.scan(scanRequest); -// -// final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); -// final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator()); -// -// return new IGTScanner() { -// @Override -// public GTInfo getInfo() { -// return fullGTInfo; -// } -// -// @Override -// public int getScannedRowCount() { -// return decorateScanner.getScannedRowCount(); -// } -// -// @Override -// public void close() throws IOException { -// decorateScanner.close(); -// } -// -// @Override -// public Iterator<GTRecord> iterator() { -// return trimmedInfoGTRecordAdapter.iterator(); -// } -// }; -// } -//} +package org.apache.kylin.storage.hbase.cube.v2; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.IGTStore; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +/** + * for test use only + */ +public class CubeHBaseScanRPC extends CubeHBaseRPC { + public static final Logger logger = LoggerFactory.getLogger(CubeHBaseScanRPC.class); + + static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> { + + private final GTInfo info; + private final Iterator<GTRecord> input; + + public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) { + this.info = info; + this.input = input; + } + + @Override + public Iterator<GTRecord> iterator() { + return new Iterator<GTRecord>() { + @Override + public boolean hasNext() { + return input.hasNext(); + } + + @Override + public GTRecord next() { + GTRecord x = input.next(); + return new GTRecord(info, x.getInternal()); + } + + @Override + public void remove() { + + } + }; + } + } + + public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { + super(cubeSeg, cuboid, fullGTInfo); + } + + @Override + public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException { + final List<IGTScanner> scanners = Lists.newArrayList(); + for (GTScanRequest request : scanRequests) { + scanners.add(getGTScanner(request)); + } + + return new IGTScanner() { + @Override + public GTInfo getInfo() { + return scanners.get(0).getInfo(); + } + + @Override + public int getScannedRowCount() { + int sum = 0; + for (IGTScanner s : scanners) { + sum += s.getScannedRowCount(); + } + return sum; + } + + @Override + public void close() throws IOException { + for (IGTScanner s : scanners) { + s.close(); + } + } + + @Override + public Iterator<GTRecord> iterator() { + return Iterators.concat(Iterators.transform(scanners.iterator(), new Function<IGTScanner, Iterator<GTRecord>>() { + @Nullable + @Override + public Iterator<GTRecord> apply(IGTScanner input) { + return input.iterator(); + } + })); + } + }; + } + + private IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { + + // primary key (also the 0th column block) is always selected + final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); + // globally shared connection, does not require close + HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); + final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); + + List<RawScan> rawScans = preparedHBaseScans(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); + List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); + + final List<ResultScanner> scanners = Lists.newArrayList(); + final List<Iterator<Result>> resultIterators = Lists.newArrayList(); + + for (RawScan rawScan : rawScans) { + + logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); + Scan hbaseScan = buildScan(rawScan); + + final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); + final Iterator<Result> iterator = scanner.iterator(); + + scanners.add(scanner); + resultIterators.add(iterator); + } + + final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator()); + + CellListIterator cellListIterator = new CellListIterator() { + @Override + public void close() throws IOException { + for (ResultScanner scanner : scanners) { + scanner.close(); + } + hbaseTable.close(); + } + + @Override + public boolean hasNext() { + return allResultsIterator.hasNext(); + } + + @Override + public List<Cell> next() { + return allResultsIterator.next().listCells(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize()); + IGTScanner rawScanner = store.scan(scanRequest); + + final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); + final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator()); + + return new IGTScanner() { + @Override + public GTInfo getInfo() { + return fullGTInfo; + } + + @Override + public int getScannedRowCount() { + return decorateScanner.getScannedRowCount(); + } + + @Override + public void close() throws IOException { + decorateScanner.close(); + } + + @Override + public Iterator<GTRecord> iterator() { + return trimmedInfoGTRecordAdapter.iterator(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/bc8ea781/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index 5798f41..ad0cb32 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -225,7 +225,7 @@ public class CubeSegmentScanner implements IGTScanner { public Scanner() { CubeHBaseRPC rpc; if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { - rpc = null; + rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); } else { rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);//default behavior }