http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index ef53cb7..938145b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -19,30 +19,35 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; import java.util.List; -import javax.annotation.Nullable; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; +import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -84,53 +89,77 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } } - public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { + public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, final GTInfo fullGTInfo) { super(cubeSeg, cuboid, fullGTInfo); + MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() { + @Override + public DimensionEncoding getDimEnc(TblColRef col) { + return fullGTInfo.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex()); + } + }); } @Override - public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException { - final List<IGTScanner> scanners = Lists.newArrayList(); - for (GTScanRequest request : scanRequests) { - scanners.add(getGTScanner(request)); - } + public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { + final IGTScanner scanner = getGTScannerInternal(scanRequest); return new IGTScanner() { @Override public GTInfo getInfo() { - return scanners.get(0).getInfo(); + return scanner.getInfo(); } @Override public int getScannedRowCount() { int sum = 0; - for (IGTScanner s : scanners) { - sum += s.getScannedRowCount(); - } + sum += scanner.getScannedRowCount(); return sum; } @Override public void close() throws IOException { - for (IGTScanner s : scanners) { - s.close(); - } + scanner.close(); } @Override public Iterator<GTRecord> iterator() { - return Iterators.concat(Iterators.transform(scanners.iterator(), new Function<IGTScanner, Iterator<GTRecord>>() { - @Nullable - @Override - public Iterator<GTRecord> apply(IGTScanner input) { - return input.iterator(); - } - })); + return scanner.iterator(); } }; } - private IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { + //for non-sharding cases it will only return one byte[] with not shard at beginning + private List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) { + final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); + + if (!cubeSeg.isEnableSharding()) { + return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked + } else { + List<byte[]> ret = Lists.newArrayList(); + for (short i = 0; i < cuboidShardNum; ++i) { + short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); + byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length); + BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); + ret.add(cookedKey); + } + return ret; + } + } + + private List<RawScan> spawnRawScansForAllShards(RawScan rawScan) { + List<RawScan> ret = Lists.newArrayList(); + List<byte[]> startKeys = getRowKeysDifferentShards(rawScan.startKey); + List<byte[]> endKeys = getRowKeysDifferentShards(rawScan.endKey); + for (int i = 0; i < startKeys.size(); i++) { + RawScan temp = new RawScan(rawScan); + temp.startKey = startKeys.get(i); + temp.endKey = endKeys.get(i); + ret.add(temp); + } + return ret; + } + + private IGTScanner getGTScannerInternal(final GTScanRequest scanRequest) throws IOException { // primary key (also the 0th column block) is always selected final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); @@ -138,22 +167,23 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); - List<RawScan> rawScans = preparedHBaseScans(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); + List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); final List<ResultScanner> scanners = Lists.newArrayList(); final List<Iterator<Result>> resultIterators = Lists.newArrayList(); for (RawScan rawScan : rawScans) { + for (RawScan rawScanWithShard : spawnRawScansForAllShards(rawScan)) { + logScan(rawScanWithShard, cubeSeg.getStorageLocationIdentifier()); + Scan hbaseScan = buildScan(rawScanWithShard); - logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); - Scan hbaseScan = buildScan(rawScan); - - final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); - final Iterator<Result> iterator = scanner.iterator(); + final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); + final Iterator<Result> iterator = scanner.iterator(); - scanners.add(scanner); - resultIterators.add(iterator); + scanners.add(scanner); + resultIterators.add(iterator); + } } final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index 6bbb9c0..9ed914a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -19,204 +19,48 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.BitSet; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.Iterator; -import java.util.List; import java.util.Set; -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.gridtable.CubeGridTable; -import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.BuildInFunctionTransformer; -import org.apache.kylin.dimension.DimensionEncoding; -import org.apache.kylin.gridtable.EmptyGTScanner; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.GTScanRangePlanner; import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.GTUtil; import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.gridtable.ScannerWorker; import org.apache.kylin.metadata.filter.ITupleFilterTransformer; import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - public class CubeSegmentScanner implements IGTScanner { private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class); - private static final int MAX_SCAN_RANGES = 200; - final CubeSegment cubeSeg; - final GTInfo info; - final List<GTScanRequest> scanRequests; - final Scanner scanner; + final ScannerWorker scanner; final Cuboid cuboid; + final GTScanRequest scanRequest; + public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) { this.cuboid = cuboid; this.cubeSeg = cubeSeg; - this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId()); - - CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); // translate FunctionTupleFilter to IN clause ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap()); filter = translator.transform(filter); - //replace the constant values in filter to dictionary codes - TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups); - - ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions); - ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc())); - ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics); - String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics); - - GTScanRangePlanner scanRangePlanner; - if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) { - TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - TblColRef partitionColOfGT = null; - Pair<ByteArray, ByteArray> segmentStartAndEnd = null; - int index = mapping.getIndexOf(tblColRef); - if (index >= 0) { - segmentStartAndEnd = getSegmentStartAndEnd(index); - partitionColOfGT = info.colRef(index); - } - scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, partitionColOfGT); - } else { - scanRangePlanner = new GTScanRangePlanner(info, null, null); - } - List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES); - - scanRequests = Lists.newArrayListWithCapacity(scanRanges.size()); - - KylinConfig config = cubeSeg.getCubeInstance().getConfig(); - for (GTScanRange range : scanRanges) { - GTScanRequest req = new GTScanRequest(info, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, config.getQueryCoprocessorMemGB()); - scanRequests.add(req); - } - - scanner = new Scanner(); - } - - private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) { - ByteArray start; - if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) { - start = encodeTime(cubeSeg.getDateRangeStart(), index, 1); - } else { - start = new ByteArray(); - } - - ByteArray end; - if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) { - end = encodeTime(cubeSeg.getDateRangeEnd(), index, -1); - } else { - end = new ByteArray(); - } - return Pair.newPair(start, end); - - } - - private ByteArray encodeTime(long ts, int index, int roundingFlag) { - String value; - DataType partitionColType = info.getColumnType(index); - if (partitionColType.isDate()) { - value = DateFormat.formatToDateStr(ts); - } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { - value = DateFormat.formatToTimeWithoutMilliStr(ts); - } else if (partitionColType.isStringFamily()) { - String partitionDateFormat = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); - if (StringUtils.isEmpty(partitionDateFormat)) - partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; - value = DateFormat.formatToDateStr(ts, partitionDateFormat); - } else { - throw new RuntimeException("Type " + partitionColType + " is not valid partition column type"); - } - - ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength()); - info.getCodeSystem().encodeColumnValue(index, value, roundingFlag, buffer); - - return ByteArray.copyOf(buffer.array(), 0, buffer.position()); - } - - private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) { - Set<TblColRef> ret = Sets.newHashSet(); - for (TblColRef col : input) { - if (cubeDesc.hasHostColumn(col)) { - for (TblColRef host : cubeDesc.getHostInfo(col).columns) { - ret.add(host); - } - } else { - ret.add(col); - } - } - return ret; - } - - private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) { - BitSet result = new BitSet(); - for (TblColRef dim : dimensions) { - int idx = mapping.getIndexOf(dim); - if (idx >= 0) - result.set(idx); - } - return new ImmutableBitSet(result); - } - - private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { - BitSet result = new BitSet(); - for (FunctionDesc metric : metrics) { - int idx = mapping.getIndexOf(metric); - if (idx < 0) - throw new IllegalStateException(metric + " not found in " + mapping); - result.set(idx); - } - return new ImmutableBitSet(result); - } - - private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { - - //metrics are represented in ImmutableBitSet, which loses order information - //sort the aggrFuns to align with metrics natural order - List<FunctionDesc> metricList = Lists.newArrayList(metrics); - Collections.sort(metricList, new Comparator<FunctionDesc>() { - @Override - public int compare(FunctionDesc o1, FunctionDesc o2) { - int a = mapping.getIndexOf(o1); - int b = mapping.getIndexOf(o2); - return a - b; - } - }); - - String[] result = new String[metricList.size()]; - int i = 0; - for (FunctionDesc metric : metricList) { - result[i++] = metric.getExpression(); - } - return result; + GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics); + scanRequest = scanRangePlanner.planScanRequest(allowPreAggregate); + scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest); } @Override @@ -231,7 +75,7 @@ public class CubeSegmentScanner implements IGTScanner { @Override public GTInfo getInfo() { - return info; + return scanRequest == null ? null : scanRequest.getInfo(); } @Override @@ -239,47 +83,4 @@ public class CubeSegmentScanner implements IGTScanner { return scanner.getScannedRowCount(); } - private class Scanner { - IGTScanner internal = null; - - public Scanner() { - CubeHBaseRPC rpc; - if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { - MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() { - @Override - public DimensionEncoding getDimEnc(TblColRef col) { - return info.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex()); - } - }); - rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); // for local debug - } else { - rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); // default behavior - } - - try { - if (scanRequests.size() == 0) { - logger.info("Segment {} will be skipped", cubeSeg); - internal = new EmptyGTScanner(0); - } else { - internal = rpc.getGTScanner(scanRequests); - } - } catch (IOException e) { - throw new RuntimeException("error", e); - } - } - - public Iterator<GTRecord> iterator() { - return internal.iterator(); - } - - public void close() throws IOException { - internal.close(); - } - - public int getScannedRowCount() { - return internal.getScannedRowCount(); - } - - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index 863dd67..e0e6d83 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -79,26 +79,26 @@ public class CubeStorageQuery implements IStorageQuery { Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>(); buildDimensionsAndMetrics(sqlDigest, dimensions, metrics); - // all dimensions = groups + filter dimensions - Set<TblColRef> filterDims = Sets.newHashSet(dimensions); - filterDims.removeAll(groups); + // all dimensions = groups + other(like filter) dimensions + Set<TblColRef> otherDims = Sets.newHashSet(dimensions); + otherDims.removeAll(groups); // expand derived (xxxD means contains host columns only, derived columns were translated) Set<TblColRef> derivedPostAggregation = Sets.newHashSet(); Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation); - Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation); - filterDimsD.removeAll(groupsD); + Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation); + otherDimsD.removeAll(groupsD); // identify cuboid Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); dimensionsD.addAll(groupsD); - dimensionsD.addAll(filterDimsD); - Cuboid cuboid = identifyCuboid(dimensionsD, metrics); + dimensionsD.addAll(otherDimsD); + Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc,dimensionsD, metrics); context.setCuboid(cuboid); // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine Set<TblColRef> singleValuesD = findSingleValueColumns(filter); - boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation); + boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation); context.setExactAggregation(isExactAggregation); // replace derived columns in filter with host columns; columns on loosened condition must be added to group by @@ -169,19 +169,7 @@ public class CubeStorageQuery implements IStorageQuery { return expanded; } - private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - for (FunctionDesc metric : metrics) { - if (metric.getMeasureType().onlyAggrInBaseCuboid()) - return Cuboid.getBaseCuboid(cubeDesc); - } - - long cuboidID = 0; - for (TblColRef column : dimensions) { - int index = cubeDesc.getRowkey().getColumnBitIndex(column); - cuboidID |= 1L << index; - } - return Cuboid.findById(cubeDesc, cuboidID); - } + @SuppressWarnings("unchecked") private Set<TblColRef> findSingleValueColumns(TupleFilter filter) { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java index c2ffdba..c2eacc1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java @@ -47,6 +47,16 @@ public class RawScan { this.hbaseMaxResultSize = hbaseMaxResultSize; } + public RawScan(RawScan other) { + + this.startKey = other.startKey; + this.endKey = other.endKey; + this.hbaseColumns = other.hbaseColumns; + this.fuzzyKeys = other.fuzzyKeys; + this.hbaseCaching = other.hbaseCaching; + this.hbaseMaxResultSize = other.hbaseMaxResultSize; + } + public String getStartKeyAsString() { return BytesUtil.toHex(this.startKey); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 1e7b1b5..9e8e251 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.List; import org.apache.commons.io.IOUtils; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.dimension.DimensionEncoding; @@ -58,6 +60,7 @@ import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryI import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.RpcCallback; @@ -138,8 +141,21 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength); } - private void appendProfileInfo(StringBuilder sb) { + private List<RawScan> deserializeRawScans(ByteBuffer in) { + int rawScanCount = BytesUtil.readVInt(in); + List<RawScan> ret = Lists.newArrayList(); + for (int i = 0; i < rawScanCount; i++) { + RawScan temp = RawScan.serializer.deserialize(in); + ret.add(temp); + } + return ret; + } + + private void appendProfileInfo(StringBuilder sb, String info) { sb.append(System.currentTimeMillis() - this.serviceStartTime); + if (info != null) { + sb.append(":").append(info); + } sb.append(","); } @@ -159,7 +175,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement region.startRegionOperation(); final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); - final RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); + List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); + for (IntList intList : request.getHbaseColumnsToGTList()) { + hbaseColumnsToGT.add(intList.getIntsList()); + } + CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); + final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() { @Override @@ -168,40 +189,61 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } }); - List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); - for (IntList intList : request.getHbaseColumnsToGTList()) { - hbaseColumnsToGT.add(intList.getIntsList()); - } - - if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) { - //if has shard, fill region shard to raw scan start/end - updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); - } - - Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan); + final List<InnerScannerAsIterator> cellListsForeachRawScan = Lists.newArrayList(); + for (RawScan hbaseRawScan : hbaseRawScans) { + if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) { + //if has shard, fill region shard to raw scan start/end + updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); + } - appendProfileInfo(sb); + Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan); + innerScanner = region.getScanner(scan); - innerScanner = region.getScanner(scan); - CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); + InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner); + cellListsForeachRawScan.add(cellListIterator); + } + + final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator()); if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) { + //this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed List<Cell> temp = Lists.newArrayList(); int counter = 0; while (innerScanner.nextRaw(temp)) { counter++; } - sb.append("Scanned " + counter + " rows in " + (System.currentTimeMillis() - serviceStartTime) + ","); + appendProfileInfo(sb, "scanned " + counter); } - InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner); if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { scanReq.setAggrCacheGB(0); // disable mem check if so told } - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); - IGTScanner rawScanner = store.scan(scanReq); + IGTStore store = new HBaseReadonlyStore(new CellListIterator() { + @Override + public void close() throws IOException { + for (CellListIterator closeable : cellListsForeachRawScan) { + closeable.close(); + } + } + + @Override + public boolean hasNext() { + return allCellLists.hasNext(); + } + + @Override + public List<Cell> next() { + return allCellLists.next(); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); + + IGTScanner rawScanner = store.scan(scanReq); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, // behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), // behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()); @@ -219,20 +261,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement finalRowCount++; } - appendProfileInfo(sb); + appendProfileInfo(sb, "agg done"); //outputStream.close() is not necessary allRows = outputStream.toByteArray(); byte[] compressedAllRows = CompressionUtils.compress(allRows); - appendProfileInfo(sb); + appendProfileInfo(sb, "compress done"); OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad(); double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize(); double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize(); - appendProfileInfo(sb); + appendProfileInfo(sb, "server stats done"); CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder(); done.run(responseBuilder.// http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java index b12451a..fa1687b 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java @@ -42,7 +42,7 @@ public class SandboxMetastoreCLI { public static void main(String[] args) throws Exception { logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); }