http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 836f142..e61f5f6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -19,6 +19,7 @@ package org.apache.kylin.storage.hbase.cube.v1; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -34,11 +35,13 @@ import java.util.TreeSet; import org.apache.hadoop.hbase.client.HConnection; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.cube.model.HBaseColumnDesc; @@ -73,7 +76,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; -//v1 @SuppressWarnings("unused") public class CubeStorageQuery implements ICachableStorageQuery { @@ -133,11 +135,8 @@ public class CubeStorageQuery implements ICachableStorageQuery { collectNonEvaluable(filter, groupsCopD); TupleFilter filterD = translateDerived(filter, groupsCopD); - // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR .. - TupleFilter flatFilter = flattenToOrAndFilter(filterD); - // translate filter into segment scan ranges - List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD); + List<HBaseKeyRange> scans = buildScanRanges(flattenToOrAndFilter(filterD), dimensionsD); // check involved measures, build value decoder for each each family:column List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context); @@ -148,6 +147,8 @@ public class CubeStorageQuery implements ICachableStorageQuery { setLimit(filter, context); HConnection conn = HBaseConnection.get(context.getConnUrl()); + + //Notice we're passing filterD down to storage instead of flatFilter return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo); } @@ -392,10 +393,12 @@ public class CubeStorageQuery implements ICachableStorageQuery { return new ArrayList<RowValueDecoder>(codecMap.values()); } + //check TupleFilter.flatFilter's comment private TupleFilter flattenToOrAndFilter(TupleFilter filter) { if (filter == null) return null; + // core TupleFilter flatFilter = filter.flatFilter(); // normalize to OR-AND filter @@ -437,27 +440,30 @@ public class CubeStorageQuery implements ICachableStorageQuery { } //log - sb.append(scanRanges.size() + "=>"); + sb.append(scanRanges.size() + "=(mergeoverlap)>"); List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges); //log - sb.append(mergedRanges.size() + "=>"); + sb.append(mergedRanges.size() + "=(mergetoomany)>"); mergedRanges = mergeTooManyRanges(mergedRanges); //log - sb.append(mergedRanges.size() + ", "); + sb.append(mergedRanges.size() + ","); result.addAll(mergedRanges); } - logger.info(sb.toString()); logger.info("hbasekeyrange count: " + result.size()); + dropUnhitSegments(result); logger.info("hbasekeyrange count after dropping unhit :" + result.size()); + result = duplicateRangeByShard(result); + logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size()); + return result; } @@ -667,6 +673,42 @@ public class CubeStorageQuery implements ICachableStorageQuery { } } + private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) { + List<HBaseKeyRange> ret = Lists.newArrayList(); + + for (HBaseKeyRange scan : scans) { + CubeSegment segment = scan.getCubeSegment(); + + byte[] startKey = scan.getStartKey(); + byte[] stopKey = scan.getStopKey(); + + short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId()); + short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId()); + for (short i = 0; i < cuboidShardNum; ++i) { + byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey); + byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey); + HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, // + scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate()); + ret.add(newRange); + } + } + + Collections.sort(ret, new Comparator<HBaseKeyRange>() { + @Override + public int compare(HBaseKeyRange o1, HBaseKeyRange o2) { + return Bytes.compareTo(o1.getStartKey(), o2.getStartKey()); + } + }); + + return ret; + } + + private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) { + byte[] ret = Arrays.copyOf(bytes, bytes.length); + BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN); + return ret; + } + private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) { if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) { return;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 7d1d833..86bc42d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -21,9 +21,13 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.zip.DataFormatException; import javax.annotation.Nullable; @@ -35,7 +39,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.util.KryoUtils; @@ -43,23 +46,29 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; +import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; +import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats; import org.apache.kylin.storage.hbase.steps.HBaseConnection; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; -import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; +import com.google.protobuf.HBaseZeroCopyByteString; public class CubeHBaseEndpointRPC extends CubeHBaseRPC { - static class EndpintResultsAsGTScanner implements IGTScanner { + static class EndpointResultsAsGTScanner implements IGTScanner { private GTInfo info; private Iterator<byte[]> blocks; + private ImmutableBitSet columns; - public EndpintResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks) { + public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns) { this.info = info; this.blocks = blocks; + this.columns = columns; } @Override @@ -84,7 +93,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public Iterator<GTRecord> apply(@Nullable final byte[] input) { - logger.info("Reassembling a raw block returned from Endpoint with byte length: " + input.length); return new Iterator<GTRecord>() { private ByteBuffer inputBuffer = null; private GTRecord oneRecord = null; @@ -101,7 +109,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public GTRecord next() { - oneRecord.loadAllColumns(inputBuffer); + oneRecord.loadColumns(columns, inputBuffer); return oneRecord; } @@ -122,43 +130,98 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { - try { - // primary key (also the 0th column block) is always selected - final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); - // globally shared connection, does not require close - HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); - final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); - final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks); - - RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns); - - byte[] scanRequestBytes = KryoUtils.serialize(scanRequest); - byte[] rawScanBytes = KryoUtils.serialize(rawScan); - CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); - builder.setGtScanRequest(ByteString.copyFrom(scanRequestBytes)).setHbaseRawScan(ByteString.copyFrom(rawScanBytes)); - - Collection<CubeVisitProtos.CubeVisitResponse> results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey); - final Collection<byte[]> rowBlocks = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() { - @Nullable + // primary key (also the 0th column block) is always selected + final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); + // globally shared connection, does not require close + HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); + final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); + + List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); + List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); + final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList(); + for (List<Integer> list : hbaseColumnsToGT) { + hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build()); + } + + byte[] scanRequestBytes = KryoUtils.serialize(scanRequest); + final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes); + + ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size()); + final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList()); + + logger.info("Total RawScan range count: " + rawScans.size()); + for (RawScan rawScan : rawScans) { + logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); + } + + for (int i = 0; i < rawScans.size(); ++i) { + final int shardIndex = i; + final RawScan rawScan = rawScans.get(i); + + executorService.submit(new Runnable() { @Override - public byte[] apply(CubeVisitProtos.CubeVisitResponse input) { + public void run() { + final byte[] rawScanBytes = KryoUtils.serialize(rawScan); + CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); + builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBytes)); + for (IntList intList : hbaseColumnsToGTIntList) { + builder.addHbaseColumnsToGT(intList); + } + + Collection<CubeVisitProtos.CubeVisitResponse> results; try { - return CompressionUtils.decompress(input.getCompressedRows().toByteArray()); - } catch (IOException | DataFormatException e) { - throw new RuntimeException(e); + results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey); + } catch (Throwable throwable) { + throw new RuntimeException("Error when visiting cubes by endpoint:", throwable); + } + + //results.size() supposed to be 1; + if (results.size() != 1) { + logger.warn("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex); } + + for (CubeVisitProtos.CubeVisitResponse result : results) { + logger.info(getStatsString(result, shardIndex)); + } + + Collection<byte[]> part = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() { + @Nullable + @Override + public byte[] apply(CubeVisitProtos.CubeVisitResponse input) { + try { + return CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows())); + } catch (IOException | DataFormatException e) { + throw new RuntimeException(e); + } + } + }); + rowBlocks.addAll(part); } }); + } + executorService.shutdown(); + try { + if (!executorService.awaitTermination(1, TimeUnit.HOURS)) { + throw new RuntimeException("Visiting cube by endpoint timeout"); + } + } catch (InterruptedException e) { + throw new RuntimeException("Visiting cube by endpoint gets interrupted"); + } - return new EndpintResultsAsGTScanner(fullGTInfo, rowBlocks.iterator()); + return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns()); + } + + private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) { + StringBuilder sb = new StringBuilder(); + Stats stats = result.getStats(); + sb.append("Shard " + shardIndex + ": "); + sb.append("Total scanned row: " + stats.getScannedRowCount() + ". "); + sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". "); + sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). "); + return sb.toString(); - } catch (Throwable throwable) { - throwable.printStackTrace(); - } - return null; } - //TODO : async callback private Collection<CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable { Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() { public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException { @@ -173,8 +236,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } }); - logger.info("{} regions returned results ", results.values().size()); - return results.values(); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 09bef0f..1d217ac 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -2,6 +2,7 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.Cell; @@ -14,6 +15,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; @@ -27,6 +29,7 @@ import org.apache.kylin.gridtable.IGTScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; public abstract class CubeHBaseRPC { @@ -69,59 +72,142 @@ public abstract class CubeHBaseRPC { return scan; } - protected RawScan prepareRawScan(GTRecord pkStart, GTRecord pkEnd, List<Pair<byte[], byte[]>> selectedColumns) { - byte[] start = makeRowKeyToScan(pkStart, (byte) 0x00); - byte[] end = makeRowKeyToScan(pkEnd, (byte) 0xff); + protected List<RawScan> preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { + final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); + List<RawScan> ret = Lists.newArrayList(); - //TODO fuzzy match + byte[] start = makeRowKeyToScan(pkStart, RowConstants.ROWKEY_LOWER_BYTE); + byte[] end = makeRowKeyToScan(pkEnd, RowConstants.ROWKEY_UPPER_BYTE); + List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys); - return new RawScan(start, end, selectedColumns, null); + short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); + + for (short i = 0; i < cuboidShardNum; ++i) { + short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); + + byte[] shardStart = Arrays.copyOf(start, start.length); + byte[] shardEnd = new byte[end.length + 1];//append extra 0 to the end key to make it inclusive while scanning + System.arraycopy(end, 0, shardEnd, 0, end.length); + + BytesUtil.writeShort(shard, shardStart, 0, RowConstants.ROWKEY_SHARDID_LEN); + BytesUtil.writeShort(shard, shardEnd, 0, RowConstants.ROWKEY_SHARDID_LEN); + + ret.add(new RawScan(shardStart, shardEnd, selectedColumns, hbaseFuzzyKeys)); + } + return ret; + + } + + /** + * translate GTRecord format fuzzy keys to hbase expected format + * @return + */ + private List<Pair<byte[], byte[]>> translateFuzzyKeys(List<GTRecord> fuzzyKeys) { + if (fuzzyKeys == null || fuzzyKeys.isEmpty()) { + return Collections.emptyList(); + } + + List<Pair<byte[], byte[]>> ret = Lists.newArrayList(); + int coreLength = fullGTInfo.getMaxColumnLength(fullGTInfo.getPrimaryKey()); + for (GTRecord gtRecordFuzzyKey : fuzzyKeys) { + byte[] hbaseFuzzyKey = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN]; + byte[] hbaseFuzzyMask = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN]; + + int pos = 0; + //shard part + Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);//shard part should better be FIXED, for simplicity we make it non-fixed + pos += RowConstants.ROWKEY_SHARDID_LEN; + + //cuboid part + Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_CUBOIDID_LEN, RowConstants.BYTE_ZERO); + System.arraycopy(cuboid.getBytes(), 0, hbaseFuzzyKey, pos, RowConstants.ROWKEY_CUBOIDID_LEN); + pos += RowConstants.ROWKEY_CUBOIDID_LEN; + + //row key core part + ByteArray coreKey = HBaseScan.exportScanKey(gtRecordFuzzyKey, RowConstants.BYTE_ZERO); + System.arraycopy(coreKey.array(), coreKey.offset(), hbaseFuzzyKey, pos, coreKey.length()); + ByteArray coreMask = HBaseScan.exportScanMask(gtRecordFuzzyKey); + System.arraycopy(coreMask.array(), coreMask.offset(), hbaseFuzzyMask, pos, coreMask.length()); + + Preconditions.checkState(coreKey.length() == coreMask.length(), "corekey length not equal coremask length"); + pos += coreKey.length(); + Preconditions.checkState(hbaseFuzzyKey.length == pos, "HBase fuzzy key not completely populated"); + + ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask)); + } + + return ret; } private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) { - ByteArray pk = GTRecord.exportScanKey(pkRec); - int pkMaxLen = pkRec.getInfo().getMaxColumnLength(pkRec.getInfo().getPrimaryKey()); + ByteArray pk = HBaseScan.exportScanKey(pkRec, fill); - byte[] buf = new byte[pkMaxLen + RowConstants.ROWKEY_CUBOIDID_LEN]; + byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_HEADER_LEN]; Arrays.fill(buf, fill); - System.arraycopy(cuboid.getBytes(), 0, buf, 0, RowConstants.ROWKEY_CUBOIDID_LEN); + //for scanning/reading, later all possible shard will be applied + + System.arraycopy(cuboid.getBytes(), 0, buf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN); if (pk != null && pk.array() != null) { - System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_CUBOIDID_LEN, pk.length()); + System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_HEADER_LEN, pk.length()); } return buf; } + /** + * prune untouched hbase columns + */ protected List<Pair<byte[], byte[]>> makeHBaseColumns(ImmutableBitSet selectedColBlocks) { List<Pair<byte[], byte[]>> result = Lists.newArrayList(); - int colBlockIdx = 1; // start from 1; the 0th column block is primary key which maps to rowkey + int colBlkIndex = 1; HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping(); for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) { byte[] byteFamily = Bytes.toBytes(familyDesc.getName()); for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) { - if (selectedColBlocks.get(colBlockIdx)) { + if (selectedColBlocks.get(colBlkIndex)) { byte[] byteQualifier = Bytes.toBytes(hbaseColDesc.getQualifier()); result.add(new Pair<byte[], byte[]>(byteFamily, byteQualifier)); } - colBlockIdx++; + colBlkIndex++; } } return result; } - //possible to use binary search as cells might be sorted - public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) { - for (Cell c : cells) { - if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && // - BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) { - return c; + /** + * for each selected hbase column, it might contain values of multiple GT columns. + * The mapping should be passed down to storage + */ + protected List<List<Integer>> getHBaseColumnsGTMapping(ImmutableBitSet selectedColBlocks) { + + List<List<Integer>> ret = Lists.newArrayList(); + + int colBlkIndex = 1; + int metricOffset = fullGTInfo.getPrimaryKey().trueBitCount(); + + HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping(); + for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) { + for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) { + if (selectedColBlocks.get(colBlkIndex)) { + int[] metricIndexes = hbaseColDesc.getMeasureIndex(); + Integer[] gtIndexes = new Integer[metricIndexes.length]; + for (int i = 0; i < gtIndexes.length; i++) { + gtIndexes[i] = metricIndexes[i] + metricOffset; + } + ret.add(Arrays.asList(gtIndexes)); + } + colBlkIndex++; } } - return null; + + Preconditions.checkState(selectedColBlocks.trueBitCount() == ret.size() + 1); + return ret; } + + public static void applyHBaseColums(Scan scan, List<Pair<byte[], byte[]>> hbaseColumns) { for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) { byte[] byteFamily = hbaseColumn.getFirst(); @@ -157,4 +243,33 @@ public abstract class CubeHBaseRPC { return result; } + protected void logScan(RawScan rawScan, String tableName) { + StringBuilder info = new StringBuilder(); + info.append("\nVisiting hbase table ").append(tableName).append(": "); + if (cuboid.requirePostAggregation()) { + info.append("cuboid require post aggregation, from "); + } else { + info.append("cuboid exact match, from "); + } + info.append(cuboid.getInputID()); + info.append(" to "); + info.append(cuboid.getId()); + info.append("\nStart: "); + info.append(rawScan.getStartKeyAsString()); + info.append(" - "); + info.append(Bytes.toStringBinary(rawScan.startKey)); + info.append("\nStop: "); + info.append(rawScan.getEndKeyAsString()); + info.append(" - "); + info.append(Bytes.toStringBinary(rawScan.endKey)); + if (rawScan.fuzzyKey != null) { + info.append("\nFuzzy key counts: " + rawScan.fuzzyKey.size()); + info.append("\nFuzzy: "); + info.append(rawScan.getFuzzyKeyAsString()); + } else { + info.append("\nNo Fuzzy Key"); + } + logger.info(info.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index e673f32..56e6c5c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -11,20 +11,55 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.storage.hbase.steps.HBaseConnection; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + /** * for test use only */ public class CubeHBaseScanRPC extends CubeHBaseRPC { + static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> { + + private final GTInfo info; + private final Iterator<GTRecord> input; + + public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) { + this.info = info; + this.input = input; + } + + @Override + public Iterator<GTRecord> iterator() { + return new Iterator<GTRecord>() { + @Override + public boolean hasNext() { + return input.hasNext(); + } + + @Override + public GTRecord next() { + GTRecord x = input.next(); + return new GTRecord(info, x.getInternal()); + } + + @Override + public void remove() { + + } + }; + } + } + public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { super(cubeSeg, cuboid, fullGTInfo); } @@ -34,34 +69,47 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { // primary key (also the 0th column block) is always selected final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); - // globally shared connection, does not require close HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); - final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); - final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks); - RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns); - Scan hbaseScan = buildScan(rawScan); + List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); + List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); + + final List<ResultScanner> scanners = Lists.newArrayList(); + final List<Iterator<Result>> resultIterators = Lists.newArrayList(); + + for (RawScan rawScan : rawScans) { + + logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); + Scan hbaseScan = buildScan(rawScan); + + final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); + final Iterator<Result> iterator = scanner.iterator(); + + scanners.add(scanner); + resultIterators.add(iterator); + } - final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); - final Iterator<Result> iterator = scanner.iterator(); + final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator()); CellListIterator cellListIterator = new CellListIterator() { @Override public void close() throws IOException { - scanner.close(); + for (ResultScanner scanner : scanners) { + scanner.close(); + } hbaseTable.close(); } @Override public boolean hasNext() { - return iterator.hasNext(); + return allResultsIterator.hasNext(); } @Override public List<Cell> next() { - return iterator.next().listCells(); + return allResultsIterator.next().listCells(); } @Override @@ -70,8 +118,32 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } }; - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, hbaseColumns); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT); IGTScanner rawScanner = store.scan(scanRequest); - return scanRequest.decorateScanner(rawScanner); + + final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); + final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator()); + + return new IGTScanner() { + @Override + public GTInfo getInfo() { + return fullGTInfo; + } + + @Override + public int getScannedRowCount() { + return decorateScanner.getScannedRowCount(); + } + + @Override + public void close() throws IOException { + decorateScanner.close(); + } + + @Override + public Iterator<GTRecord> iterator() { + return trimmedInfoGTRecordAdapter.iterator(); + } + }; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java deleted file mode 100644 index 9359934..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java +++ /dev/null @@ -1,265 +0,0 @@ -package org.apache.kylin.storage.hbase.cube.v2; - -import java.io.IOException; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.gridtable.CubeGridTable; -import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.GTScanRange; -import org.apache.kylin.gridtable.GTScanRangePlanner; -import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.GTUtil; -import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -public class CubeScanner implements IGTScanner { - - private static final int MAX_SCAN_RANGES = 200; - - final CubeSegment cubeSeg; - final GTInfo info; - final byte[] trimmedInfoBytes; - final List<GTScanRequest> scanRequests; - final Scanner scanner; - final Cuboid cuboid; - - public CubeScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // - Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) { - this.cuboid = cuboid; - this.cubeSeg = cubeSeg; - this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId()); - - CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); - - //replace the constant values in filter to dictionary codes - TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups); - - ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions); - ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc())); - ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics); - String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics); - - //TODO: should remove this in endpoint scenario - GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info); - List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES); - - scanRequests = Lists.newArrayListWithCapacity(scanRanges.size()); - - trimmedInfoBytes = GTInfo.serialize(info); - GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes); - - for (GTScanRange range : scanRanges) { - scanRequests.add(new GTScanRequest(trimmedInfo, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate)); - } - - scanner = new Scanner(); - } - - private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) { - Set<TblColRef> ret = Sets.newHashSet(); - for (TblColRef col : input) { - if (cubeDesc.isDerived(col)) { - for (TblColRef host : cubeDesc.getHostInfo(col).columns) { - ret.add(host); - } - } else { - ret.add(col); - } - } - return ret; - } - - private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) { - BitSet result = new BitSet(); - for (TblColRef dim : dimensions) { - int idx = mapping.getIndexOf(dim); - if (idx >= 0) - result.set(idx); - } - return new ImmutableBitSet(result); - } - - private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { - BitSet result = new BitSet(); - for (FunctionDesc metric : metrics) { - int idx = mapping.getIndexOf(metric); - if (idx < 0) - throw new IllegalStateException(metric + " not found in " + mapping); - result.set(idx); - } - return new ImmutableBitSet(result); - } - - private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { - - //metrics are represented in ImmutableBitSet, which loses order information - //sort the aggrFuns to align with metrics natural order - List<FunctionDesc> metricList = Lists.newArrayList(metrics); - Collections.sort(metricList, new Comparator<FunctionDesc>() { - @Override - public int compare(FunctionDesc o1, FunctionDesc o2) { - int a = mapping.getIndexOf(o1); - int b = mapping.getIndexOf(o2); - return a - b; - } - }); - - String[] result = new String[metricList.size()]; - int i = 0; - for (FunctionDesc metric : metricList) { - result[i++] = metric.getExpression(); - } - return result; - } - - @Override - public Iterator<GTRecord> iterator() { - return scanner.iterator(); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override - public GTInfo getInfo() { - return info; - } - - @Override - public int getScannedRowCount() { - return scanner.getScannedRowCount(); - } - - static class RemoteGTRecordAdapter implements Iterable<GTRecord> { - - private final GTInfo info; - private final Iterator<GTRecord> input; - - public RemoteGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) { - this.info = info; - this.input = input; - } - - @Override - public Iterator<GTRecord> iterator() { - return new Iterator<GTRecord>() { - @Override - public boolean hasNext() { - return input.hasNext(); - } - - @Override - public GTRecord next() { - GTRecord x = input.next(); - return new GTRecord(info, x.getInternal()); - } - - @Override - public void remove() { - - } - }; - } - } - - private class Scanner { - final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()]; - int cur = 0; - Iterator<GTRecord> curIterator = null; - GTRecord next = null; - - public Iterator<GTRecord> iterator() { - return new Iterator<GTRecord>() { - - @Override - public boolean hasNext() { - if (next != null) - return true; - - if (curIterator == null) { - if (cur >= scanRequests.size()) - return false; - - try { - CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); - inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur)); - curIterator = inputScanners[cur].iterator(); - //curIterator = new RemoteGTRecordAdapter(info, inputScanners[cur].iterator()).iterator(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - if (curIterator.hasNext() == false) { - curIterator = null; - cur++; - return hasNext(); - } - - next = curIterator.next(); - return true; - } - - @Override - public GTRecord next() { - // fetch next record - if (next == null) { - hasNext(); - if (next == null) - throw new NoSuchElementException(); - } - - GTRecord result = next; - next = null; - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - public void close() throws IOException { - for (int i = 0; i < inputScanners.length; i++) { - if (inputScanners[i] != null) { - inputScanners[i].close(); - } - } - } - - public int getScannedRowCount() { - int result = 0; - for (int i = 0; i < inputScanners.length; i++) { - if (inputScanners[i] == null) - break; - - result += inputScanners[i].getScannedRowCount(); - } - return result; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java new file mode 100644 index 0000000..286da55 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -0,0 +1,290 @@ +package org.apache.kylin.storage.hbase.cube.v2; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRange; +import org.apache.kylin.gridtable.GTScanRangePlanner; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTUtil; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class CubeSegmentScanner implements IGTScanner { + + private static final int MAX_SCAN_RANGES = 200; + + final CubeSegment cubeSeg; + final GTInfo info; + final byte[] trimmedInfoBytes; + final List<GTScanRequest> scanRequests; + final Scanner scanner; + final Cuboid cuboid; + + public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // + Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) { + this.cuboid = cuboid; + this.cubeSeg = cubeSeg; + this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId()); + + CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); + + //replace the constant values in filter to dictionary codes + TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups); + + ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions); + ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc())); + ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics); + String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics); + + GTScanRangePlanner scanRangePlanner; + if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) { + TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); + Pair<ByteArray, ByteArray> segmentStartAndEnd = null; + int index = mapping.getIndexOf(tblColRef); + if (index >= 0) { + segmentStartAndEnd = getSegmentStartAndEnd(tblColRef, index); + } + scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, tblColRef); + } else { + scanRangePlanner = new GTScanRangePlanner(info, null, null); + } + List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES); + + scanRequests = Lists.newArrayListWithCapacity(scanRanges.size()); + + trimmedInfoBytes = GTInfo.serialize(info); + GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes); + + for (GTScanRange range : scanRanges) { + scanRequests.add(new GTScanRequest(trimmedInfo, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate)); + } + + scanner = new Scanner(); + } + + private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(TblColRef tblColRef, int index) { + + String partitionColType = tblColRef.getColumnDesc().getDatatype(); + + ByteArray start; + if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) { + start = translateTsToString(cubeSeg.getDateRangeStart(), partitionColType, index); + } else { + start = new ByteArray(); + } + + ByteArray end; + if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) { + end = translateTsToString(cubeSeg.getDateRangeEnd(), partitionColType, index); + } else { + end = new ByteArray(); + } + return Pair.newPair(start, end); + + } + + private ByteArray translateTsToString(long ts, String partitionColType, int index) { + String value; + if ("date".equalsIgnoreCase(partitionColType)) { + value = DateFormat.formatToDateStr(ts); + } else if ("timestamp".equalsIgnoreCase(partitionColType)) { + //TODO: if partition col is not dict encoded, value's format may differ from expected. Though by default it is not the case + value = DateFormat.formatToTimeWithoutMilliStr(ts); + } else { + throw new RuntimeException("Type " + partitionColType + " is not valid partition column type"); + } + + ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength()); + info.getCodeSystem().encodeColumnValue(index, value, buffer); + + return ByteArray.copyOf(buffer.array(), 0, buffer.position()); + } + + private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) { + Set<TblColRef> ret = Sets.newHashSet(); + for (TblColRef col : input) { + if (cubeDesc.isDerived(col)) { + for (TblColRef host : cubeDesc.getHostInfo(col).columns) { + ret.add(host); + } + } else { + ret.add(col); + } + } + return ret; + } + + private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) { + BitSet result = new BitSet(); + for (TblColRef dim : dimensions) { + int idx = mapping.getIndexOf(dim); + if (idx >= 0) + result.set(idx); + } + return new ImmutableBitSet(result); + } + + private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { + BitSet result = new BitSet(); + for (FunctionDesc metric : metrics) { + int idx = mapping.getIndexOf(metric); + if (idx < 0) + throw new IllegalStateException(metric + " not found in " + mapping); + result.set(idx); + } + return new ImmutableBitSet(result); + } + + private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { + + //metrics are represented in ImmutableBitSet, which loses order information + //sort the aggrFuns to align with metrics natural order + List<FunctionDesc> metricList = Lists.newArrayList(metrics); + Collections.sort(metricList, new Comparator<FunctionDesc>() { + @Override + public int compare(FunctionDesc o1, FunctionDesc o2) { + int a = mapping.getIndexOf(o1); + int b = mapping.getIndexOf(o2); + return a - b; + } + }); + + String[] result = new String[metricList.size()]; + int i = 0; + for (FunctionDesc metric : metricList) { + result[i++] = metric.getExpression(); + } + return result; + } + + @Override + public Iterator<GTRecord> iterator() { + return scanner.iterator(); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + + @Override + public GTInfo getInfo() { + return info; + } + + @Override + public int getScannedRowCount() { + return scanner.getScannedRowCount(); + } + + private class Scanner { + final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()]; + int cur = 0; + Iterator<GTRecord> curIterator = null; + GTRecord next = null; + + public Iterator<GTRecord> iterator() { + return new Iterator<GTRecord>() { + + @Override + public boolean hasNext() { + if (next != null) + return true; + + if (curIterator == null) { + if (cur >= scanRequests.size()) + return false; + + try { + + CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); + //CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); + + //change previous line to CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); + //to debug locally + + inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur)); + curIterator = inputScanners[cur].iterator(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + if (curIterator.hasNext() == false) { + curIterator = null; + cur++; + return hasNext(); + } + + next = curIterator.next(); + return true; + } + + @Override + public GTRecord next() { + // fetch next record + if (next == null) { + hasNext(); + if (next == null) + throw new NoSuchElementException(); + } + + GTRecord result = next; + next = null; + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + public void close() throws IOException { + for (int i = 0; i < inputScanners.length; i++) { + if (inputScanners[i] != null) { + inputScanners[i].close(); + } + } + } + + public int getScannedRowCount() { + int result = 0; + for (int i = 0; i < inputScanners.length; i++) { + if (inputScanners[i] == null) + break; + + result += inputScanners[i].getScannedRowCount(); + } + return result; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index 9bd73f5..eba0620 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -36,6 +36,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.Sets; +@SuppressWarnings("unused") public class CubeStorageQuery implements ICachableStorageQuery { private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class); @@ -91,13 +92,11 @@ public class CubeStorageQuery implements ICachableStorageQuery { TupleFilter filterD = translateDerived(filter, groupsD); setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory - // TODO enable coprocessor - // setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); - List<CubeScanner> scanners = Lists.newArrayList(); + List<CubeSegmentScanner> scanners = Lists.newArrayList(); for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { - scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation)); + scanners.add(new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation)); } if (scanners.isEmpty()) http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java index a6c6a23..7731f19 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java @@ -24,7 +24,7 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.gridtable.GTInfo; @@ -34,20 +34,22 @@ import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.gridtable.IGTWriter; +import com.google.common.base.Preconditions; + public class HBaseReadonlyStore implements IGTStore { private CellListIterator cellListIterator; private GTInfo info; private List<Pair<byte[], byte[]>> hbaseColumns; - private ImmutableBitSet selectedColBlocks; + private List<List<Integer>> hbaseColumnsToGT; - public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns) { + public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT) { this.cellListIterator = cellListIterator; this.info = gtScanRequest.getInfo(); this.hbaseColumns = hbaseColumns; - this.selectedColBlocks = gtScanRequest.getSelectedColBlocks().set(0); + this.hbaseColumnsToGT = hbaseColumnsToGT; } @Override @@ -56,20 +58,31 @@ public class HBaseReadonlyStore implements IGTStore { } @Override - public IGTWriter rebuild(int shard) throws IOException { + public IGTWriter rebuild() throws IOException { throw new UnsupportedOperationException(); } @Override - public IGTWriter append(int shard) throws IOException { + public IGTWriter append() throws IOException { throw new UnsupportedOperationException(); } + //TODO: possible to use binary search as cells might be sorted? + public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) { + for (Cell c : cells) { + if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && // + BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) { + return c; + } + } + return null; + } + @Override public IGTScanner scan(GTScanRequest scanRequest) throws IOException { return new IGTScanner() { int count; - + @Override public void close() throws IOException { cellListIterator.close(); @@ -79,7 +92,7 @@ public class HBaseReadonlyStore implements IGTStore { public Iterator<GTRecord> iterator() { return new Iterator<GTRecord>() { GTRecord oneRecord = new GTRecord(info); // avoid object creation - + @Override public boolean hasNext() { return cellListIterator.hasNext(); @@ -87,26 +100,24 @@ public class HBaseReadonlyStore implements IGTStore { @Override public GTRecord next() { + count++; List<Cell> oneRow = cellListIterator.next(); if (oneRow.size() < 1) { throw new IllegalStateException("cell list's size less than 1"); } - ByteBuffer buf; - // dimensions, set to primary key, also the 0th column block Cell firstCell = oneRow.get(0); - buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_CUBOIDID_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_CUBOIDID_LEN); + ByteBuffer buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN); oneRecord.loadCellBlock(0, buf); // metrics - int hbaseColIdx = 0; - for (int i = 1; i < selectedColBlocks.trueBitCount(); i++) { - int colBlockIdx = selectedColBlocks.trueBitAt(i); - Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(hbaseColIdx++); - Cell cell = CubeHBaseRPC.findCell(oneRow, hbaseColumn.getFirst(), hbaseColumn.getSecond()); + for (int i = 0; i < hbaseColumns.size(); i++) { + Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(i); + Cell cell = HBaseReadonlyStore.findCell(oneRow, hbaseColumn.getFirst(), hbaseColumn.getSecond()); + Preconditions.checkNotNull(cell); buf = byteBuffer(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - oneRecord.loadCellBlock(colBlockIdx, buf); + oneRecord.loadColumns(hbaseColumnsToGT.get(i), buf); } return oneRecord; @@ -116,7 +127,7 @@ public class HBaseReadonlyStore implements IGTStore { public void remove() { throw new UnsupportedOperationException(); } - + private ByteBuffer byteBuffer(byte[] array, int offset, int length) { return ByteBuffer.wrap(array, offset, length); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java new file mode 100644 index 0000000..7667830 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2; + +import java.util.Arrays; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import com.google.common.base.Preconditions; + +public class HBaseScan { + + /** + * every column in scan key is fixed length. for empty values, 0 zero will be populated + */ + public static ByteArray exportScanKey(GTRecord rec, byte fill) { + + Preconditions.checkNotNull(rec); + + GTInfo info = rec.getInfo(); + int len = info.getMaxColumnLength(info.getPrimaryKey()); + ByteArray buf = ByteArray.allocate(len); + int pos = 0; + for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) { + int c = info.getPrimaryKey().trueBitAt(i); + int colLength = info.getCodeSystem().maxCodeLength(c); + + if (rec.get(c).array() != null) { + Preconditions.checkArgument(colLength == rec.get(c).length(), "ColLength :" + colLength + " not equals cols[c] length: " + rec.get(c).length() + " c is " + c); + System.arraycopy(rec.get(c).array(), rec.get(c).offset(), buf.array(), buf.offset() + pos, rec.get(c).length()); + } else { + Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill); + } + pos += colLength; + } + buf.setLength(pos); + + return buf; + } + + /** + * every column in scan key is fixed length. for fixed columns, 0 will be populated, for non-fixed columns, 1 will be populated + */ + public static ByteArray exportScanMask(GTRecord rec) { + Preconditions.checkNotNull(rec); + + GTInfo info = rec.getInfo(); + int len = info.getMaxColumnLength(info.getPrimaryKey()); + ByteArray buf = ByteArray.allocate(len); + byte fill; + + int pos = 0; + for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) { + int c = info.getPrimaryKey().trueBitAt(i); + int colLength = info.getCodeSystem().maxCodeLength(c); + + if (rec.get(c).array() != null) { + fill = RowConstants.BYTE_ZERO; + } else { + fill = RowConstants.BYTE_ONE; + } + Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill); + pos += colLength; + } + buf.setLength(pos); + + return buf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java index aa73927..ad4263f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java @@ -20,13 +20,14 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.util.List; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; public class RawScan { public byte[] startKey; public byte[] endKey; - public List<Pair<byte[], byte[]>> hbaseColumns; + public List<Pair<byte[], byte[]>> hbaseColumns;//only contain interested columns public List<Pair<byte[], byte[]>> fuzzyKey; public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> hbaseColumns, List<Pair<byte[], byte[]>> fuzzyKey) { @@ -37,4 +38,23 @@ public class RawScan { this.fuzzyKey = fuzzyKey; } + public String getStartKeyAsString() { + return BytesUtil.toHex(this.startKey); + } + + public String getEndKeyAsString() { + return BytesUtil.toHex(this.endKey); + } + + public String getFuzzyKeyAsString() { + StringBuilder buf = new StringBuilder(); + for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKey) { + buf.append(BytesUtil.toHex(fuzzyKey.getFirst())); + buf.append(" "); + buf.append(BytesUtil.toHex(fuzzyKey.getSecond())); + buf.append(System.lineSeparator()); + } + return buf.toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java index 4686da2..85aa54a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java @@ -27,10 +27,10 @@ public class SequentialCubeTupleIterator implements ITupleIterator { private final Set<FunctionDesc> selectedMetrics; private final TupleInfo tupleInfo; private final Tuple tuple; - private final Iterator<CubeScanner> scannerIterator; + private final Iterator<CubeSegmentScanner> scannerIterator; private final StorageContext context; - private CubeScanner curScanner; + private CubeSegmentScanner curScanner; private Iterator<GTRecord> curRecordIterator; private CubeTupleConverter curTupleConverter; private Tuple next; @@ -38,7 +38,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { private int scanCount; private int scanCountDelta; - public SequentialCubeTupleIterator(List<CubeScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // + public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) { this.cuboid = cuboid; this.selectedDimensions = selectedDimensions; @@ -112,7 +112,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { } } - private void close(CubeScanner scanner) { + private void close(CubeSegmentScanner scanner) { try { scanner.close(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index f0b8c6f..ba766bd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -46,11 +46,12 @@ import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC; import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore; import org.apache.kylin.storage.hbase.cube.v2.RawScan; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; +import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; +import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; @@ -125,9 +126,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { this.serviceStartTime = System.currentTimeMillis(); - GTScanRequest scanReq = KryoUtils.deserialize(request.getGtScanRequest().toByteArray(), GTScanRequest.class); - RawScan hbaseRawScan = KryoUtils.deserialize(request.getHbaseRawScan().toByteArray(), RawScan.class); - //TODO: rewrite own start/end + GTScanRequest scanReq = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()), GTScanRequest.class); + RawScan hbaseRawScan = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()), RawScan.class); + List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); + for (IntList intList : request.getHbaseColumnsToGTList()) { + hbaseColumnsToGT.add(intList.getIntsList()); + } + Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan); region = env.getRegion(); @@ -136,26 +141,30 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement innerScanner = region.getScanner(scan); InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner); - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT); IGTScanner rawScanner = store.scan(scanReq); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner); ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow + int finalRowCount = 0; for (GTRecord oneRecord : finalScanner) { buffer.clear(); - oneRecord.exportAllColumns(buffer); + oneRecord.exportColumns(scanReq.getColumns(), buffer); buffer.flip(); + outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining()); + finalRowCount++; } //outputStream.close() is not necessary byte[] allRows = outputStream.toByteArray(); CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder(); done.run(responseBuilder.// - setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies + setCompressedRows(HBaseZeroCopyByteString.wrap(CompressionUtils.compress(allRows))).//too many array copies setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().// - setAggregatedRowCount(0).// - setScannedRowCount(0).// + setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).// + setScannedRowCount(finalScanner.getScannedRowCount()).// setServiceStartTime(serviceStartTime).// setServiceEndTime(System.currentTimeMillis()).build()).// build());