KYLIN-1308 query storage v2 enable parallel cube visiting
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eb1d276c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eb1d276c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eb1d276c Branch: refs/heads/KYLIN-1122 Commit: eb1d276c7721bd3056328e4256b2b54ffa95346d Parents: 06f79e6 Author: honma <ho...@ebay.com> Authored: Wed Jan 13 17:53:40 2016 +0800 Committer: Xiaoyu Wang <wangxia...@apache.org> Committed: Mon Jan 18 13:38:30 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/cube/kv/LazyRowKeyEncoder.java | 26 -- .../apache/kylin/gridtable/GTScanRequest.java | 139 +++++---- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 223 ++++++++------ .../storage/hbase/cube/v2/CubeHBaseRPC.java | 31 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 304 +++++++++---------- .../hbase/cube/v2/CubeSegmentScanner.java | 105 ++----- .../storage/hbase/cube/v2/CubeStorageQuery.java | 1 + .../coprocessor/endpoint/CubeVisitService.java | 40 ++- 8 files changed, 416 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java index 7c70fff..0fca726 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java @@ -18,16 +18,9 @@ package org.apache.kylin.cube.kv; -import java.util.Arrays; -import java.util.List; - -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import com.google.common.collect.Lists; - /** * A LazyRowKeyEncoder will not try to calculate shard * It works for both enableSharding or non-enableSharding scenario @@ -45,23 +38,4 @@ public class LazyRowKeyEncoder extends RowKeyEncoder { throw new RuntimeException("If enableSharding false, you should never calculate shard"); } } - - //for non-sharding cases it will only return one byte[] with not shard at beginning - public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) { - final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); - - if (!enableSharding) { - return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked - } else { - List<byte[]> ret = Lists.newArrayList(); - for (short i = 0; i < cuboidShardNum; ++i) { - short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); - byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length); - BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); - ret.add(cookedKey); - } - return ret; - } - - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index abaec85..8b0779d 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -35,73 +35,6 @@ public class GTScanRequest { private boolean allowPreAggregation = true; private double aggrCacheGB = 0; // no limit - public static final BytesSerializer<GTScanRequest> serializer = new BytesSerializer<GTScanRequest>() { - @Override - public void serialize(GTScanRequest value, ByteBuffer out) { - GTInfo.serializer.serialize(value.info, out); - - serializeGTRecord(value.range.pkStart, out); - serializeGTRecord(value.range.pkEnd, out); - BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out); - for (GTRecord f : value.range.fuzzyKeys) { - serializeGTRecord(f, out); - } - - ImmutableBitSet.serializer.serialize(value.columns, out); - BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, value.info), out); - - ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out); - ImmutableBitSet.serializer.serialize(value.aggrMetrics, out); - BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out); - BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out); - out.putDouble(value.aggrCacheGB); - } - - @Override - public GTScanRequest deserialize(ByteBuffer in) { - GTInfo sInfo = GTInfo.serializer.deserialize(in); - - GTRecord sPkStart = deserializeGTRecord(in, sInfo); - GTRecord sPkEnd = deserializeGTRecord(in, sInfo); - List<GTRecord> sFuzzyKeys = Lists.newArrayList(); - int sFuzzyKeySize = BytesUtil.readVInt(in); - for (int i = 0; i < sFuzzyKeySize; i++) { - sFuzzyKeys.add(deserializeGTRecord(in, sInfo)); - } - GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys); - - ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in); - TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo); - - ImmutableBitSet sAggGroupBy = ImmutableBitSet.serializer.deserialize(in); - ImmutableBitSet sAggrMetrics = ImmutableBitSet.serializer.deserialize(in); - String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in); - boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1); - double sAggrCacheGB = in.getDouble(); - - return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB); - } - - private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) { - BytesUtil.writeVInt(gtRecord.cols.length, out); - for (ByteArray col : gtRecord.cols) { - col.exportData(out); - } - ImmutableBitSet.serializer.serialize(gtRecord.maskForEqualHashComp, out); - } - - private GTRecord deserializeGTRecord(ByteBuffer in, GTInfo sInfo) { - int colLength = BytesUtil.readVInt(in); - ByteArray[] sCols = new ByteArray[colLength]; - for (int i = 0; i < colLength; i++) { - sCols[i] = ByteArray.importData(in); - } - ImmutableBitSet sMaskForEqualHashComp = ImmutableBitSet.serializer.deserialize(in); - return new GTRecord(sInfo, sMaskForEqualHashComp, sCols); - } - - }; - public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) { this.info = info; this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range; @@ -128,9 +61,6 @@ public class GTScanRequest { } private void validate(GTInfo info) { - if (range == null) - range = new GTScanRange(null, null); - if (hasAggregation()) { if (aggrGroupBy.intersects(aggrMetrics)) throw new IllegalStateException(); @@ -292,4 +222,73 @@ public class GTScanRequest { return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; } + + public static final BytesSerializer<GTScanRequest> serializer = new BytesSerializer<GTScanRequest>() { + @Override + public void serialize(GTScanRequest value, ByteBuffer out) { + GTInfo.serializer.serialize(value.info, out); + + serializeGTRecord(value.range.pkStart, out); + serializeGTRecord(value.range.pkEnd, out); + BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out); + for (GTRecord f : value.range.fuzzyKeys) { + serializeGTRecord(f, out); + } + + ImmutableBitSet.serializer.serialize(value.columns, out); + BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, value.info), out); + + ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out); + ImmutableBitSet.serializer.serialize(value.aggrMetrics, out); + BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out); + BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out); + out.putDouble(value.aggrCacheGB); + } + + @Override + public GTScanRequest deserialize(ByteBuffer in) { + GTInfo sInfo = GTInfo.serializer.deserialize(in); + + GTRecord sPkStart = deserializeGTRecord(in, sInfo); + GTRecord sPkEnd = deserializeGTRecord(in, sInfo); + List<GTRecord> sFuzzyKeys = Lists.newArrayList(); + int sFuzzyKeySize = BytesUtil.readVInt(in); + for (int i = 0; i < sFuzzyKeySize; i++) { + sFuzzyKeys.add(deserializeGTRecord(in, sInfo)); + } + GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys); + + ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in); + TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo); + + ImmutableBitSet sAggGroupBy = ImmutableBitSet.serializer.deserialize(in); + ImmutableBitSet sAggrMetrics = ImmutableBitSet.serializer.deserialize(in); + String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in); + boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1); + double sAggrCacheGB = in.getDouble(); + + return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB); + } + + private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) { + BytesUtil.writeVInt(gtRecord.cols.length, out); + for (ByteArray col : gtRecord.cols) { + col.exportData(out); + } + ImmutableBitSet.serializer.serialize(gtRecord.maskForEqualHashComp, out); + } + + private GTRecord deserializeGTRecord(ByteBuffer in, GTInfo sInfo) { + int colLength = BytesUtil.readVInt(in); + ByteArray[] sCols = new ByteArray[colLength]; + for (int i = 0; i < colLength; i++) { + sCols[i] = ByteArray.importData(in); + } + ImmutableBitSet sMaskForEqualHashComp = ImmutableBitSet.serializer.deserialize(in); + return new GTRecord(sInfo, sMaskForEqualHashComp, sCols); + } + + }; + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index a193d10..05c9ffe 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -20,31 +20,30 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; import javax.annotation.Nullable; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; @@ -56,15 +55,14 @@ import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Function; -import com.google.common.collect.Collections2; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.HBaseZeroCopyByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @@ -72,6 +70,49 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = Executors.newCachedThreadPool(); + static class ExpectedSizeIterator implements Iterator<byte[]> { + + int expectedSize; + int current = 0; + BlockingQueue<byte[]> queue; + + public ExpectedSizeIterator(int expectedSize) { + this.expectedSize = expectedSize; + this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); + } + + @Override + public boolean hasNext() { + return (current < expectedSize); + } + + @Override + public byte[] next() { + if (current >= expectedSize) { + throw new IllegalStateException("Won't have more data"); + } + try { + current++; + return queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException("error when waiting queue", e); + } + } + + @Override + public void remove() { + throw new NotImplementedException(); + } + + public void append(byte[] data) { + try { + queue.put(data); + } catch (InterruptedException e) { + throw new RuntimeException("error when waiting queue", e); + } + } + } + static class EndpointResultsAsGTScanner implements IGTScanner { private GTInfo info; private Iterator<byte[]> blocks; @@ -141,112 +182,112 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { super(cubeSeg, cuboid, fullGTInfo); } + private byte[] getByteArrayForShort(short v) { + byte[] split = new byte[Bytes.SIZEOF_SHORT]; + BytesUtil.writeUnsigned(v, split, 0, Bytes.SIZEOF_SHORT); + return split; + } + + private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short shardNum, int totalShards) { + if (baseShard + shardNum <= totalShards) { + //endpoint end key is inclusive, so no need to append 0 or anything + return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (baseShard + shardNum - 1)))); + } else { + //0,1,2,3,4 wants 4,0 + return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))),// + Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1)))); + } + } + @Override - public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { + public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException { + + final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); + logger.debug("The execution of this query will use " + toggle + " as endpoint's behavior"); + + short cuboidBaseShard = cubeSeg.getCuboidBaseShard(this.cuboid.getId()); + short shardNum = cubeSeg.getCuboidShardNum(this.cuboid.getId()); + int totalShards = cubeSeg.getTotalShards(); + + final List<ByteString> scanRequestByteStrings = Lists.newArrayList(); + final List<ByteString> rawScanByteStrings = Lists.newArrayList(); // primary key (also the 0th column block) is always selected - final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); + final ImmutableBitSet selectedColBlocks = scanRequests.get(0).getSelectedColBlocks().set(0); + // globally shared connection, does not require close - HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); - final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); + final HTableInterface hbaseTable = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()).getTable(cubeSeg.getStorageLocationIdentifier()); - List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); - List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList(); + List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); for (List<Integer> list : hbaseColumnsToGT) { hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build()); } - ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); - GTScanRequest.serializer.serialize(scanRequest, buffer); - buffer.flip(); - final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()); - logger.debug("Serialized scanRequestBytes's size is " + (buffer.limit() - buffer.position())); + for (GTScanRequest req : scanRequests) { + ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); + GTScanRequest.serializer.serialize(req, buffer); + buffer.flip(); + scanRequestByteStrings.add(HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit())); - final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList()); + RawScan rawScan = preparedHBaseScan(req.getPkStart(), req.getPkEnd(), req.getFuzzyKeys(), selectedColBlocks); - logger.debug("Total RawScan range count: " + rawScans.size()); - for (RawScan rawScan : rawScans) { + ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); + RawScan.serializer.serialize(rawScan, rawScanBuffer); + rawScanBuffer.flip(); + rawScanByteStrings.add(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit())); + + logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", buffer.limit() - buffer.position(), rawScanBuffer.limit() - rawScanBuffer.position()); logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); } - final AtomicInteger totalScannedCount = new AtomicInteger(0); - final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); - logger.debug("The execution of this query will use " + toggle + " as endpoint's behavior"); - List<Future<?>> futures = Lists.newArrayList(); - - for (int i = 0; i < rawScans.size(); ++i) { - final int shardIndex = i; - final RawScan rawScan = rawScans.get(i); - - Future<?> future = executorService.submit(new Runnable() { - @Override - public void run() { - - ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); - RawScan.serializer.serialize(rawScan, rawScanBuffer); - rawScanBuffer.flip(); - - CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); - builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit())); - for (IntList intList : hbaseColumnsToGTIntList) { - builder.addHbaseColumnsToGT(intList); - } - builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); - builder.setBehavior(toggle); - - Collection<CubeVisitProtos.CubeVisitResponse> results; - try { - results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey); - } catch (Throwable throwable) { - throw new RuntimeException("Error when visiting cubes by endpoint:", throwable); - } + logger.debug("Start to executing: {} shards starting from {}", shardNum, cuboidBaseShard); - //results.size() supposed to be 1; - if (results.size() != 1) { - logger.info("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex); - } - - for (CubeVisitProtos.CubeVisitResponse result : results) { - totalScannedCount.addAndGet(result.getStats().getScannedRowCount()); - logger.info(getStatsString(result, shardIndex)); - } + final AtomicInteger totalScannedCount = new AtomicInteger(0); + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum); + + for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { + for (int i = 0; i < scanRequests.size(); ++i) { + final int scanIndex = i; + executorService.submit(new Runnable() { + @Override + public void run() { + CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); + builder.setGtScanRequest(scanRequestByteStrings.get(scanIndex)).setHbaseRawScan(rawScanByteStrings.get(scanIndex)); + for (IntList intList : hbaseColumnsToGTIntList) { + builder.addHbaseColumnsToGT(intList); + } + builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); + builder.setBehavior(toggle); + + Map<byte[], CubeVisitProtos.CubeVisitResponse> results; + try { + results = getResults(builder.build(), hbaseTable, epRange.getFirst(), epRange.getSecond()); + } catch (Throwable throwable) { + throw new RuntimeException("Error when visiting cubes by endpoint:", throwable); + } - Collection<byte[]> part = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() { - @Nullable - @Override - public byte[] apply(CubeVisitProtos.CubeVisitResponse input) { + for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) { + totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount()); + logger.info(getStatsString(result)); try { - return CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows())); + epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()))); } catch (IOException | DataFormatException e) { - throw new RuntimeException(e); + throw new RuntimeException("Error when decompressing", e); } } - }); - rowBlocks.addAll(part); - } - }); - futures.add(future); - } - try { - for (Future<?> future : futures) { - future.get(1, TimeUnit.HOURS); + } + }); } - } catch (InterruptedException e) { - throw new RuntimeException("Visiting cube by endpoint gets interrupted", e); - } catch (ExecutionException e) { - throw new RuntimeException("Visiting cube throw exception", e); - } catch (TimeoutException e) { - throw new RuntimeException("Visiting cube by endpoint timeout", e); } - return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns(), totalScannedCount.get()); + return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequests.get(0).getColumns(), totalScannedCount.get()); } - private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) { + private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) { StringBuilder sb = new StringBuilder(); - Stats stats = result.getStats(); - sb.append("Shard " + shardIndex + " on host: " + stats.getHostname() + "."); + Stats stats = result.getValue().getStats(); + sb.append("Shard " + BytesUtil.toHex(result.getKey()) + " on host: " + stats.getHostname() + "."); sb.append("Total scanned row: " + stats.getScannedRowCount() + ". "); sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". "); sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). "); @@ -256,7 +297,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } - private Collection<CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable { + private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable { Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() { public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException { ServerRpcController controller = new ServerRpcController(); @@ -270,6 +311,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } }); - return results.values(); + return results; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index c25197f..07143b1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -5,8 +5,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import javax.annotation.Nullable; - import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -32,7 +30,6 @@ import org.apache.kylin.gridtable.IGTScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -55,7 +52,7 @@ public abstract class CubeHBaseRPC { this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); } - abstract IGTScanner getGTScanner(GTScanRequest scanRequest) throws IOException; + abstract IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException; public static Scan buildScan(RawScan rawScan) { Scan scan = new Scan(); @@ -80,34 +77,22 @@ public abstract class CubeHBaseRPC { return scan; } - protected List<RawScan> preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { + protected RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); - List<RawScan> ret = Lists.newArrayList(); LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid); byte[] start = encoder.createBuf(); byte[] end = encoder.createBuf(); - List<byte[]> startKeys; - List<byte[]> endKeys; encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE); encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start); - startKeys = encoder.getRowKeysDifferentShards(start); encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE); encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end); - endKeys = encoder.getRowKeysDifferentShards(end); - endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() { - @Nullable - @Override - public byte[] apply(byte[] input) { - byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning - System.arraycopy(input, 0, shardEnd, 0, input.length); - return shardEnd; - } - }); + byte[] temp = new byte[end.length + 1];//append extra 0 to the end key to make it inclusive while scanning + System.arraycopy(end, 0, temp, 0, end.length); + end = temp; - Preconditions.checkState(startKeys.size() == endKeys.size()); List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys); KylinConfig config = cubeSeg.getCubeDesc().getConfig(); @@ -116,11 +101,7 @@ public abstract class CubeHBaseRPC { if (isMemoryHungry(selectedColBlocks)) hbaseCaching /= 10; - for (short i = 0; i < startKeys.size(); ++i) { - ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize)); - } - return ret; - + return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize); } private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) { http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index b623a65..90888a3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -1,152 +1,152 @@ -package org.apache.kylin.storage.hbase.cube.v2; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.gridtable.IGTStore; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; - -/** - * for test use only - */ -public class CubeHBaseScanRPC extends CubeHBaseRPC { - public static final Logger logger = LoggerFactory.getLogger(CubeHBaseScanRPC.class); - - static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> { - - private final GTInfo info; - private final Iterator<GTRecord> input; - - public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) { - this.info = info; - this.input = input; - } - - @Override - public Iterator<GTRecord> iterator() { - return new Iterator<GTRecord>() { - @Override - public boolean hasNext() { - return input.hasNext(); - } - - @Override - public GTRecord next() { - GTRecord x = input.next(); - return new GTRecord(info, x.getInternal()); - } - - @Override - public void remove() { - - } - }; - } - } - - public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { - super(cubeSeg, cuboid, fullGTInfo); - } - - @Override - public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { - - // primary key (also the 0th column block) is always selected - final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); - // globally shared connection, does not require close - HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); - final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); - - List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); - List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); - - final List<ResultScanner> scanners = Lists.newArrayList(); - final List<Iterator<Result>> resultIterators = Lists.newArrayList(); - - for (RawScan rawScan : rawScans) { - - logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); - Scan hbaseScan = buildScan(rawScan); - - final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); - final Iterator<Result> iterator = scanner.iterator(); - - scanners.add(scanner); - resultIterators.add(iterator); - } - - final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator()); - - CellListIterator cellListIterator = new CellListIterator() { - @Override - public void close() throws IOException { - for (ResultScanner scanner : scanners) { - scanner.close(); - } - hbaseTable.close(); - } - - @Override - public boolean hasNext() { - return allResultsIterator.hasNext(); - } - - @Override - public List<Cell> next() { - return allResultsIterator.next().listCells(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize()); - IGTScanner rawScanner = store.scan(scanRequest); - - final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); - final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator()); - - return new IGTScanner() { - @Override - public GTInfo getInfo() { - return fullGTInfo; - } - - @Override - public int getScannedRowCount() { - return decorateScanner.getScannedRowCount(); - } - - @Override - public void close() throws IOException { - decorateScanner.close(); - } - - @Override - public Iterator<GTRecord> iterator() { - return trimmedInfoGTRecordAdapter.iterator(); - } - }; - } -} +//package org.apache.kylin.storage.hbase.cube.v2; +// +//import java.io.IOException; +//import java.util.Iterator; +//import java.util.List; +// +//import org.apache.hadoop.hbase.Cell; +//import org.apache.hadoop.hbase.client.HConnection; +//import org.apache.hadoop.hbase.client.HTableInterface; +//import org.apache.hadoop.hbase.client.Result; +//import org.apache.hadoop.hbase.client.ResultScanner; +//import org.apache.hadoop.hbase.client.Scan; +//import org.apache.kylin.common.util.ImmutableBitSet; +//import org.apache.kylin.cube.CubeSegment; +//import org.apache.kylin.cube.cuboid.Cuboid; +//import org.apache.kylin.gridtable.GTInfo; +//import org.apache.kylin.gridtable.GTRecord; +//import org.apache.kylin.gridtable.GTScanRequest; +//import org.apache.kylin.gridtable.IGTScanner; +//import org.apache.kylin.gridtable.IGTStore; +//import org.apache.kylin.storage.hbase.HBaseConnection; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.google.common.collect.Iterators; +//import com.google.common.collect.Lists; +// +///** +// * for test use only +// */ +//public class CubeHBaseScanRPC extends CubeHBaseRPC { +// public static final Logger logger = LoggerFactory.getLogger(CubeHBaseScanRPC.class); +// +// static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> { +// +// private final GTInfo info; +// private final Iterator<GTRecord> input; +// +// public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) { +// this.info = info; +// this.input = input; +// } +// +// @Override +// public Iterator<GTRecord> iterator() { +// return new Iterator<GTRecord>() { +// @Override +// public boolean hasNext() { +// return input.hasNext(); +// } +// +// @Override +// public GTRecord next() { +// GTRecord x = input.next(); +// return new GTRecord(info, x.getInternal()); +// } +// +// @Override +// public void remove() { +// +// } +// }; +// } +// } +// +// public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { +// super(cubeSeg, cuboid, fullGTInfo); +// } +// +// @Override +// public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { +// +// // primary key (also the 0th column block) is always selected +// final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); +// // globally shared connection, does not require close +// HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); +// final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); +// +// List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); +// List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); +// +// final List<ResultScanner> scanners = Lists.newArrayList(); +// final List<Iterator<Result>> resultIterators = Lists.newArrayList(); +// +// for (RawScan rawScan : rawScans) { +// +// logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); +// Scan hbaseScan = buildScan(rawScan); +// +// final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); +// final Iterator<Result> iterator = scanner.iterator(); +// +// scanners.add(scanner); +// resultIterators.add(iterator); +// } +// +// final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator()); +// +// CellListIterator cellListIterator = new CellListIterator() { +// @Override +// public void close() throws IOException { +// for (ResultScanner scanner : scanners) { +// scanner.close(); +// } +// hbaseTable.close(); +// } +// +// @Override +// public boolean hasNext() { +// return allResultsIterator.hasNext(); +// } +// +// @Override +// public List<Cell> next() { +// return allResultsIterator.next().listCells(); +// } +// +// @Override +// public void remove() { +// throw new UnsupportedOperationException(); +// } +// }; +// +// IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize()); +// IGTScanner rawScanner = store.scan(scanRequest); +// +// final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); +// final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator()); +// +// return new IGTScanner() { +// @Override +// public GTInfo getInfo() { +// return fullGTInfo; +// } +// +// @Override +// public int getScannedRowCount() { +// return decorateScanner.getScannedRowCount(); +// } +// +// @Override +// public void close() throws IOException { +// decorateScanner.close(); +// } +// +// @Override +// public Iterator<GTRecord> iterator() { +// return trimmedInfoGTRecordAdapter.iterator(); +// } +// }; +// } +//} http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index 246f1c4..5798f41 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -8,7 +8,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -25,6 +24,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.gridtable.NotEnoughGTInfoException; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.TupleFilterDictionaryTranslater; +import org.apache.kylin.gridtable.EmptyGTScanner; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; @@ -83,9 +83,9 @@ public class CubeSegmentScanner implements IGTScanner { int index = mapping.getIndexOf(tblColRef); if (index >= 0) { segmentStartAndEnd = getSegmentStartAndEnd(index); - partitionColOfGT = info.colRef(index); + partitionColOfGT = info.colRef(index); } - scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd,partitionColOfGT); + scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, partitionColOfGT); } else { scanRangePlanner = new GTScanRangePlanner(info, null, null); } @@ -220,90 +220,39 @@ public class CubeSegmentScanner implements IGTScanner { } private class Scanner { - final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()]; - int cur = 0; - Iterator<GTRecord> curIterator = null; - GTRecord next = null; + IGTScanner internal = null; - public Iterator<GTRecord> iterator() { - return new Iterator<GTRecord>() { - - @Override - public boolean hasNext() { - if (next != null) - return true; - - if (curIterator == null) { - if (cur >= scanRequests.size()) - return false; - - try { - - CubeHBaseRPC rpc; - if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { - rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); - } else { - rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);//default behavior - } - - //change previous line to CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); - //to debug locally - - inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur)); - curIterator = inputScanners[cur].iterator(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - if (curIterator.hasNext() == false) { - curIterator = null; - cur++; - return hasNext(); - } - - next = curIterator.next(); - return true; - } - - @Override - public GTRecord next() { - // fetch next record - if (next == null) { - hasNext(); - if (next == null) - throw new NoSuchElementException(); - } - - GTRecord result = next; - next = null; - return result; + public Scanner() { + CubeHBaseRPC rpc; + if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { + rpc = null; + } else { + rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);//default behavior + } + //change previous line to CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); + //to debug locally + + try { + if (scanRequests.size() == 0) { + internal = new EmptyGTScanner(); + } else { + internal = rpc.getGTScanner(scanRequests); } + } catch (IOException e) { + throw new RuntimeException("error", e); + } + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + public Iterator<GTRecord> iterator() { + return internal.iterator(); } public void close() throws IOException { - for (int i = 0; i < inputScanners.length; i++) { - if (inputScanners[i] != null) { - inputScanners[i].close(); - } - } + internal.close(); } public int getScannedRowCount() { - int result = 0; - for (int i = 0; i < inputScanners.length; i++) { - if (inputScanners[i] == null) - break; - - result += inputScanners[i].getScannedRowCount(); - } - return result; + return internal.getScannedRowCount(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index 78bd25b..9f0ff2b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -99,6 +99,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { try { scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation); } catch (NotEnoughGTInfoException e) { + //deal with empty cube segment logger.info("Cannot construct Segment {}'s GTInfo, this may due to empty segment or broken metadata", cubeSeg); continue; } http://git-wip-us.apache.org/repos/asf/kylin/blob/eb1d276c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index e396a76..7ac4be1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.gridtable.GTRecord; @@ -123,6 +125,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } } + private void updateRawScanByCurrentRegion(RawScan rawScan, HRegion region, int shardLength) { + if (shardLength == 0) { + return; + } + byte[] regionStartKey = ArrayUtils.isEmpty(region.getStartKey()) ? new byte[shardLength] : region.getStartKey(); + Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength); + Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength); + } + + private void appendProfileInfo(StringBuilder sb) { + sb.append(System.currentTimeMillis() - this.serviceStartTime); + sb.append(","); + } + @Override public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { RegionScanner innerScanner = null; @@ -134,20 +150,25 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { this.serviceStartTime = System.currentTimeMillis(); + region = env.getRegion(); + region.startRegionOperation(); + GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); + List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { hbaseColumnsToGT.add(intList.getIntsList()); } + if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) { + //if has shard, fill region shard to raw scan start/end + updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); + } + Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan); - sb.append(System.currentTimeMillis() - this.serviceStartTime); - sb.append(","); - - region = env.getRegion(); - region.startRegionOperation(); + appendProfileInfo(sb); innerScanner = region.getScanner(scan); CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); @@ -177,23 +198,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement finalRowCount++; } - sb.append(System.currentTimeMillis() - this.serviceStartTime); - sb.append(","); + appendProfileInfo(sb); //outputStream.close() is not necessary allRows = outputStream.toByteArray(); byte[] compressedAllRows = CompressionUtils.compress(allRows); - sb.append(System.currentTimeMillis() - this.serviceStartTime); - sb.append(","); + appendProfileInfo(sb); OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad(); double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize(); double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize(); - sb.append(System.currentTimeMillis() - this.serviceStartTime); - sb.append(","); + appendProfileInfo(sb); CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder(); done.run(responseBuilder.//