refactor about default gtstore
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4fd74fc6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4fd74fc6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4fd74fc6 Branch: refs/heads/stream_m1 Commit: 4fd74fc6a7d7c9bcccab38b57da76f2b983cf5ef Parents: 62ae3cb Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Jun 23 16:37:09 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Jun 23 16:37:09 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/gridtable/ScannerWorker.java | 11 +- .../storage/gtrecord/CubeSegmentScanner.java | 100 +++++ .../storage/gtrecord/CubeTupleConverter.java | 270 +++++++++++++ .../gtrecord/GTCubeStorageQueryBase.java | 377 +++++++++++++++++++ .../gtrecord/SequentialCubeTupleIterator.java | 210 +++++++++++ .../hbase/cube/v2/CubeSegmentScanner.java | 100 ----- .../storage/hbase/cube/v2/CubeStorageQuery.java | 349 +---------------- .../hbase/cube/v2/CubeTupleConverter.java | 270 ------------- .../cube/v2/SequentialCubeTupleIterator.java | 210 ----------- 9 files changed, 968 insertions(+), 929 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java index 1ac3b02..586a584 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java @@ -24,8 +24,6 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Iterator; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.slf4j.Logger; @@ -36,7 +34,7 @@ public class ScannerWorker { private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class); private IGTScanner internal = null; - public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest) { + public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) { if (scanRequest == null) { logger.info("Segment {} will be skipped", cubeSeg); internal = new EmptyGTScanner(0); @@ -46,12 +44,7 @@ public class ScannerWorker { final GTInfo info = scanRequest.getInfo(); try { - IGTStorage rpc; - if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { - rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // for local debug - } else { - rpc = (IGTStorage) Class.forName(KylinConfig.getInstanceFromEnv().getDefaultIGTStorage()).getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior - } + IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior internal = rpc.getGTScanner(scanRequest); } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) { throw new RuntimeException("error", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java new file mode 100644 index 0000000..c12159d --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.dict.BuildInFunctionTransformer; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRangePlanner; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.ScannerWorker; +import org.apache.kylin.metadata.filter.ITupleFilterTransformer; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.StorageContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CubeSegmentScanner implements IGTScanner { + + private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class); + + final CubeSegment cubeSeg; + final ScannerWorker scanner; + final Cuboid cuboid; + + final GTScanRequest scanRequest; + + public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // + Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context,String gtStorage) { + this.cuboid = cuboid; + this.cubeSeg = cubeSeg; + + // translate FunctionTupleFilter to IN clause + ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap()); + filter = translator.transform(filter); + + String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner(); + GTScanRangePlanner scanRangePlanner; + try { + scanRangePlanner = (GTScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics); + } catch (Exception e) { + throw new RuntimeException(e); + } + scanRequest = scanRangePlanner.planScanRequest(); + if (scanRequest != null) { + scanRequest.setAllowPreAggregation(!context.isExactAggregation()); + scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB()); + if (context.isLimitEnabled()) + scanRequest.setRowLimit(context.getLimit()); + } + scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest,gtStorage); + } + + @Override + public Iterator<GTRecord> iterator() { + return scanner.iterator(); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + + @Override + public GTInfo getInfo() { + return scanRequest == null ? null : scanRequest.getInfo(); + } + + @Override + public int getScannedRowCount() { + return scanner.getScannedRowCount(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java new file mode 100644 index 0000000..d6917e1 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.kylin.common.util.Array; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; +import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; +import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.tuple.Tuple; +import org.apache.kylin.metadata.tuple.TupleInfo; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * convert GTRecord to tuple + */ +public class CubeTupleConverter { + + final CubeSegment cubeSeg; + final Cuboid cuboid; + final TupleInfo tupleInfo; + final List<IDerivedColumnFiller> derivedColFillers; + + final int[] gtColIdx; + final int[] tupleIdx; + final Object[] gtValues; + final MeasureType<?>[] measureTypes; + + final List<IAdvMeasureFiller> advMeasureFillers; + final List<Integer> advMeasureIndexInGTValues; + + final int nSelectedDims; + + public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // + Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) { + this.cubeSeg = cubeSeg; + this.cuboid = cuboid; + this.tupleInfo = returnTupleInfo; + this.derivedColFillers = Lists.newArrayList(); + + List<TblColRef> cuboidDims = cuboid.getColumns(); + CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); + + nSelectedDims = selectedDimensions.size(); + gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; + tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; + gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()]; + + // measure types don't have this many, but aligned length make programming easier + measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()]; + + advMeasureFillers = Lists.newArrayListWithCapacity(1); + advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1); + + int iii = 0; + + // pre-calculate dimension index mapping to tuple + for (TblColRef dim : selectedDimensions) { + int i = mapping.getIndexOf(dim); + gtColIdx[iii] = i; + tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; + iii++; + } + + for (FunctionDesc metric : selectedMetrics) { + int i = mapping.getIndexOf(metric); + gtColIdx[iii] = i; + + if (metric.needRewrite()) { + String rewriteFieldName = metric.getRewriteFieldName(); + tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; + } + // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column + else { + TblColRef col = metric.getParameter().getColRefs().get(0); + tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; + } + + MeasureType<?> measureType = metric.getMeasureType(); + if (measureType.needAdvancedTupleFilling()) { + Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric)); + advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap)); + advMeasureIndexInGTValues.add(iii); + } else { + measureTypes[iii] = measureType; + } + + iii++; + } + + // prepare derived columns and filler + Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null); + for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) { + TblColRef[] hostCols = entry.getKey().data; + for (DeriveInfo deriveInfo : entry.getValue()) { + IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo); + if (filler != null) { + derivedColFillers.add(filler); + } + } + } + } + + // load only needed dictionaries + private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) { + Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); + for (TblColRef col : columnsNeedDictionary) { + result.put(col, cubeSeg.getDictionary(col)); + } + return result; + } + + public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) { + + record.getValues(gtColIdx, gtValues); + + // dimensions + for (int i = 0; i < nSelectedDims; i++) { + int ti = tupleIdx[i]; + if (ti >= 0) { + tuple.setDimensionValue(ti, toString(gtValues[i])); + } + } + + // measures + for (int i = nSelectedDims; i < gtColIdx.length; i++) { + int ti = tupleIdx[i]; + if (ti >= 0 && measureTypes[i] != null) { + measureTypes[i].fillTupleSimply(tuple, ti, gtValues[i]); + } + } + + // derived + for (IDerivedColumnFiller filler : derivedColFillers) { + filler.fillDerivedColumns(gtValues, tuple); + } + + // advanced measure filling, due to possible row split, will complete at caller side + if (advMeasureFillers.isEmpty()) { + return null; + } else { + for (int i = 0; i < advMeasureFillers.size(); i++) { + Object measureValue = gtValues[advMeasureIndexInGTValues.get(i)]; + advMeasureFillers.get(i).reload(measureValue); + } + return advMeasureFillers; + } + } + + private interface IDerivedColumnFiller { + public void fillDerivedColumns(Object[] gtValues, Tuple tuple); + } + + private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) { + boolean allHostsPresent = true; + final int[] hostTmpIdx = new int[hostCols.length]; + for (int i = 0; i < hostCols.length; i++) { + hostTmpIdx[i] = indexOnTheGTValues(hostCols[i]); + allHostsPresent = allHostsPresent && hostTmpIdx[i] >= 0; + } + + boolean needCopyDerived = false; + final int[] derivedTupleIdx = new int[deriveInfo.columns.length]; + for (int i = 0; i < deriveInfo.columns.length; i++) { + TblColRef col = deriveInfo.columns[i]; + derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; + needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0; + } + + if ((allHostsPresent && needCopyDerived) == false) + return null; + + switch (deriveInfo.type) { + case LOOKUP: + return new IDerivedColumnFiller() { + CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig()); + LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension); + int[] derivedColIdx = initDerivedColIdx(); + Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]); + + private int[] initDerivedColIdx() { + int[] idx = new int[deriveInfo.columns.length]; + for (int i = 0; i < idx.length; i++) { + idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex(); + } + return idx; + } + + @Override + public void fillDerivedColumns(Object[] gtValues, Tuple tuple) { + for (int i = 0; i < hostTmpIdx.length; i++) { + lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]); + } + + String[] lookupRow = lookupTable.getRow(lookupKey); + + if (lookupRow != null) { + for (int i = 0; i < derivedTupleIdx.length; i++) { + if (derivedTupleIdx[i] >= 0) { + String value = lookupRow[derivedColIdx[i]]; + tuple.setDimensionValue(derivedTupleIdx[i], value); + } + } + } else { + for (int i = 0; i < derivedTupleIdx.length; i++) { + if (derivedTupleIdx[i] >= 0) { + tuple.setDimensionValue(derivedTupleIdx[i], null); + } + } + } + } + }; + case PK_FK: + return new IDerivedColumnFiller() { + @Override + public void fillDerivedColumns(Object[] gtValues, Tuple tuple) { + // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns() + tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]])); + } + }; + default: + throw new IllegalArgumentException(); + } + } + + private int indexOnTheGTValues(TblColRef col) { + List<TblColRef> cuboidDims = cuboid.getColumns(); + int cuboidIdx = cuboidDims.indexOf(col); + for (int i = 0; i < gtColIdx.length; i++) { + if (gtColIdx[i] == cuboidIdx) + return i; + } + return -1; + } + + private static String toString(Object o) { + return o == null ? null : o.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java new file mode 100644 index 0000000..e58e74a --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; +import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.metadata.filter.ColumnTupleFilter; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.LogicalTupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.tuple.ITupleIterator; +import org.apache.kylin.metadata.tuple.TupleInfo; +import org.apache.kylin.storage.IStorageQuery; +import org.apache.kylin.storage.StorageContext; +import org.apache.kylin.storage.translate.DerivedFilterTranslator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public abstract class GTCubeStorageQueryBase implements IStorageQuery { + + private static final Logger logger = LoggerFactory.getLogger(GTCubeStorageQueryBase.class); + + private final CubeInstance cubeInstance; + private final CubeDesc cubeDesc; + + public GTCubeStorageQueryBase(CubeInstance cube) { + this.cubeInstance = cube; + this.cubeDesc = cube.getDescriptor(); + } + + @Override + public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { + // allow custom measures hack + notifyBeforeStorageQuery(sqlDigest); + + Collection<TblColRef> groups = sqlDigest.groupbyColumns; + TupleFilter filter = sqlDigest.filter; + + // build dimension & metrics + Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>(); + Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>(); + buildDimensionsAndMetrics(sqlDigest, dimensions, metrics); + + // all dimensions = groups + other(like filter) dimensions + Set<TblColRef> otherDims = Sets.newHashSet(dimensions); + otherDims.removeAll(groups); + + // expand derived (xxxD means contains host columns only, derived columns were translated) + Set<TblColRef> derivedPostAggregation = Sets.newHashSet(); + Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation); + Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation); + otherDimsD.removeAll(groupsD); + + // identify cuboid + Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); + dimensionsD.addAll(groupsD); + dimensionsD.addAll(otherDimsD); + Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics); + context.setCuboid(cuboid); + + // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine + Set<TblColRef> singleValuesD = findSingleValueColumns(filter); + boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation); + context.setExactAggregation(isExactAggregation); + + // replace derived columns in filter with host columns; columns on loosened condition must be added to group by + TupleFilter filterD = translateDerived(filter, groupsD); + + setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory + setLimit(filter, context); + + List<CubeSegmentScanner> scanners = Lists.newArrayList(); + for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { + CubeSegmentScanner scanner; + if (cubeSeg.getInputRecords() == 0) { + logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg); + } + scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage()); + scanners.add(scanner); + } + + if (scanners.isEmpty()) + return ITupleIterator.EMPTY_TUPLE_ITERATOR; + + return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); + } + + protected abstract String getGTStorage(); + + private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) { + for (FunctionDesc func : sqlDigest.aggregations) { + if (!func.isDimensionAsMetric()) { + // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision + metrics.add(findAggrFuncFromCubeDesc(func)); + } + } + + for (TblColRef column : sqlDigest.allColumns) { + // skip measure columns + if (sqlDigest.metricColumns.contains(column) && !(sqlDigest.groupbyColumns.contains(column) || sqlDigest.filterColumns.contains(column))) { + continue; + } + + dimensions.add(column); + } + } + + private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) { + for (MeasureDesc measure : cubeDesc.getMeasures()) { + if (measure.getFunction().equals(aggrFunc)) + return measure.getFunction(); + } + return aggrFunc; + } + + private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) { + Set<TblColRef> expanded = Sets.newHashSet(); + for (TblColRef col : cols) { + if (cubeDesc.hasHostColumn(col)) { + DeriveInfo hostInfo = cubeDesc.getHostInfo(col); + for (TblColRef hostCol : hostInfo.columns) { + expanded.add(hostCol); + if (hostInfo.isOneToOne == false) + derivedPostAggregation.add(hostCol); + } + } else { + expanded.add(col); + } + } + return expanded; + } + + @SuppressWarnings("unchecked") + private Set<TblColRef> findSingleValueColumns(TupleFilter filter) { + Collection<? extends TupleFilter> toCheck; + if (filter instanceof CompareTupleFilter) { + toCheck = Collections.singleton(filter); + } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) { + toCheck = filter.getChildren(); + } else { + return (Set<TblColRef>) Collections.EMPTY_SET; + } + + Set<TblColRef> result = Sets.newHashSet(); + for (TupleFilter f : toCheck) { + if (f instanceof CompareTupleFilter) { + CompareTupleFilter compFilter = (CompareTupleFilter) f; + // is COL=const ? + if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) { + result.add(compFilter.getColumn()); + } + } + } + + // expand derived + Set<TblColRef> resultD = Sets.newHashSet(); + for (TblColRef col : result) { + if (cubeDesc.isExtendedColumn(col)) { + throw new CubeDesc.CannotFilterExtendedColumnException(col); + } + if (cubeDesc.isDerived(col)) { + DeriveInfo hostInfo = cubeDesc.getHostInfo(col); + if (hostInfo.isOneToOne) { + for (TblColRef hostCol : hostInfo.columns) { + resultD.add(hostCol); + } + } + //if not one2one, it will be pruned + } else { + resultD.add(col); + } + } + return resultD; + } + + private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) { + boolean exact = true; + + if (cuboid.requirePostAggregation()) { + exact = false; + logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId()); + } + + // derived aggregation is bad, unless expanded columns are already in group by + if (groups.containsAll(derivedPostAggregation) == false) { + exact = false; + logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation); + } + + // other columns (from filter) is bad, unless they are ensured to have single value + if (singleValuesD.containsAll(othersD) == false) { + exact = false; + logger.info("exactAggregation is false because some column not on group by: " + othersD // + + " (single value column: " + singleValuesD + ")"); + } + + // for partitioned cube, the partition column must belong to group by or has single value + PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc(); + if (partDesc.isPartitioned()) { + TblColRef col = partDesc.getPartitionDateColumnRef(); + if (!groups.contains(col) && !singleValuesD.contains(col)) { + exact = false; + logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by"); + } + } + + if (exact) { + logger.info("exactAggregation is true"); + } + return exact; + } + + @SuppressWarnings("unchecked") + private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) { + if (filter == null) + return filter; + + if (filter instanceof CompareTupleFilter) { + return translateDerivedInCompare((CompareTupleFilter) filter, collector); + } + + List<TupleFilter> children = (List<TupleFilter>) filter.getChildren(); + List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size()); + boolean modified = false; + for (TupleFilter child : children) { + TupleFilter translated = translateDerived(child, collector); + newChildren.add(translated); + if (child != translated) + modified = true; + } + if (modified) { + filter = replaceChildren(filter, newChildren); + } + return filter; + } + + private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) { + if (filter instanceof LogicalTupleFilter) { + LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator()); + r.addChildren(newChildren); + return r; + } else + throw new IllegalStateException("Cannot replaceChildren on " + filter); + } + + private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) { + if (compf.getColumn() == null || compf.getValues().isEmpty()) + return compf; + + TblColRef derived = compf.getColumn(); + if (cubeDesc.isExtendedColumn(derived)) { + throw new CubeDesc.CannotFilterExtendedColumnException(derived); + } + if (cubeDesc.isDerived(derived) == false) + return compf; + + DeriveInfo hostInfo = cubeDesc.getHostInfo(derived); + CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig()); + CubeSegment seg = cubeInstance.getLatestReadySegment(); + LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension); + Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf); + TupleFilter translatedFilter = translated.getFirst(); + boolean loosened = translated.getSecond(); + if (loosened) { + collectColumnsRecursively(translatedFilter, collector); + } + return translatedFilter; + } + + private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) { + if (filter == null) + return; + + if (filter instanceof ColumnTupleFilter) { + collectColumns(((ColumnTupleFilter) filter).getColumn(), collector); + } + for (TupleFilter child : filter.getChildren()) { + collectColumnsRecursively(child, collector); + } + } + + private void collectColumns(TblColRef col, Set<TblColRef> collector) { + if (cubeDesc.isExtendedColumn(col)) { + throw new CubeDesc.CannotFilterExtendedColumnException(col); + } + if (cubeDesc.isDerived(col)) { + DeriveInfo hostInfo = cubeDesc.getHostInfo(col); + for (TblColRef h : hostInfo.columns) + collector.add(h); + } else { + collector.add(col); + } + } + + private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) { + boolean hasMemHungryMeasure = false; + for (FunctionDesc func : metrics) { + hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry(); + } + + // need to limit the memory usage for memory hungry measures + if (hasMemHungryMeasure == false) { + return; + } + + int rowSizeEst = dimensions.size() * 3; + for (FunctionDesc func : metrics) { + // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage) + rowSizeEst += func.getReturnDataType().getStorageBytesEstimate(); + } + + long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst; + if (rowEst > 0) { + logger.info("Memory budget is set to " + rowEst + " rows"); + context.setThreshold((int) rowEst); + } else { + logger.info("Memory budget is not set."); + } + } + + private void setLimit(TupleFilter filter, StorageContext context) { + boolean goodAggr = context.isExactAggregation(); + boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled()); + boolean goodSort = context.hasSort() == false; + if (goodAggr && goodFilter && goodSort) { + logger.info("Enable limit " + context.getLimit()); + context.enableLimit(); + } + } + + private void notifyBeforeStorageQuery(SQLDigest sqlDigest) { + for (MeasureDesc measure : cubeDesc.getMeasures()) { + MeasureType<?> measureType = measure.getFunction().getMeasureType(); + measureType.adjustSqlDigest(measure, sqlDigest); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java new file mode 100644 index 0000000..3681e5e --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.tuple.ITuple; +import org.apache.kylin.metadata.tuple.ITupleIterator; +import org.apache.kylin.metadata.tuple.Tuple; +import org.apache.kylin.metadata.tuple.TupleInfo; +import org.apache.kylin.storage.StorageContext; +import org.apache.kylin.storage.exception.ScanOutOfLimitException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SequentialCubeTupleIterator implements ITupleIterator { + + private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class); + + protected final Cuboid cuboid; + protected final Set<TblColRef> selectedDimensions; + protected final Set<FunctionDesc> selectedMetrics; + protected final TupleInfo tupleInfo; + protected final Tuple tuple; + protected final Iterator<CubeSegmentScanner> scannerIterator; + protected final StorageContext context; + + protected CubeSegmentScanner curScanner; + protected Iterator<GTRecord> curRecordIterator; + protected CubeTupleConverter curTupleConverter; + protected Tuple next; + + private List<IAdvMeasureFiller> advMeasureFillers; + private int advMeasureRowsRemaining; + private int advMeasureRowIndex; + + private int scanCount; + private int scanCountDelta; + + public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // + Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) { + this.cuboid = cuboid; + this.selectedDimensions = selectedDimensions; + this.selectedMetrics = selectedMetrics; + this.tupleInfo = returnTupleInfo; + this.tuple = new Tuple(returnTupleInfo); + this.scannerIterator = scanners.iterator(); + this.context = context; + } + + @Override + public boolean hasNext() { + if (next != null) + return true; + + if (hitLimitAndThreshold()) + return false; + + // consume any left rows from advanced measure filler + if (advMeasureRowsRemaining > 0) { + for (IAdvMeasureFiller filler : advMeasureFillers) { + filler.fillTuple(tuple, advMeasureRowIndex); + } + advMeasureRowIndex++; + advMeasureRowsRemaining--; + next = tuple; + return true; + } + + // get the next GTRecord + if (curScanner == null) { + if (scannerIterator.hasNext()) { + curScanner = scannerIterator.next(); + curRecordIterator = curScanner.iterator(); + if (curRecordIterator.hasNext()) { + //if the segment does not has any tuples, don't bother to create a converter + curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + } + } else { + return false; + } + } + if (curRecordIterator.hasNext() == false) { + close(curScanner); + curScanner = null; + curRecordIterator = null; + curTupleConverter = null; + return hasNext(); + } + + // now we have a GTRecord + GTRecord curRecord = curRecordIterator.next(); + + // translate into tuple + advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple); + + // the simple case + if (advMeasureFillers == null) { + next = tuple; + return true; + } + + // advanced measure filling, like TopN, will produce multiple tuples out of one record + advMeasureRowsRemaining = -1; + for (IAdvMeasureFiller filler : advMeasureFillers) { + if (advMeasureRowsRemaining < 0) + advMeasureRowsRemaining = filler.getNumOfRows(); + if (advMeasureRowsRemaining != filler.getNumOfRows()) + throw new IllegalStateException(); + } + if (advMeasureRowsRemaining < 0) + throw new IllegalStateException(); + + advMeasureRowIndex = 0; + return hasNext(); + } + + + private boolean hitLimitAndThreshold() { + // check limit + if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) { + return true; + } + // check threshold + if (scanCount >= context.getThreshold()) { + throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause."); + } + return false; + } + + @Override + public ITuple next() { + // fetch next record + if (next == null) { + hasNext(); + if (next == null) + throw new NoSuchElementException(); + } + + scanCount++; + if (++scanCountDelta >= 1000) + flushScanCountDelta(); + + ITuple result = next; + next = null; + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + // hasNext() loop may exit because of limit, threshold, etc. + // close all the remaining segmentIterator + flushScanCountDelta(); + + if (curScanner != null) + close(curScanner); + + while (scannerIterator.hasNext()) { + close(scannerIterator.next()); + } + } + + protected void close(CubeSegmentScanner scanner) { + try { + scanner.close(); + } catch (IOException e) { + logger.error("Exception when close CubeScanner", e); + } + } + + public int getScanCount() { + return scanCount; + } + + private void flushScanCountDelta() { + context.increaseTotalScanCount(scanCountDelta); + scanCountDelta = 0; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java deleted file mode 100644 index 9890ae9..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v2; - -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.dict.BuildInFunctionTransformer; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.GTScanRangePlanner; -import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.gridtable.ScannerWorker; -import org.apache.kylin.metadata.filter.ITupleFilterTransformer; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.StorageContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CubeSegmentScanner implements IGTScanner { - - private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class); - - final CubeSegment cubeSeg; - final ScannerWorker scanner; - final Cuboid cuboid; - - final GTScanRequest scanRequest; - - public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // - Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) { - this.cuboid = cuboid; - this.cubeSeg = cubeSeg; - - // translate FunctionTupleFilter to IN clause - ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap()); - filter = translator.transform(filter); - - String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner(); - GTScanRangePlanner scanRangePlanner; - try { - scanRangePlanner = (GTScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics); - } catch (Exception e) { - throw new RuntimeException(e); - } - scanRequest = scanRangePlanner.planScanRequest(); - if (scanRequest != null) { - scanRequest.setAllowPreAggregation(!context.isExactAggregation()); - scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB()); - if (context.isLimitEnabled()) - scanRequest.setRowLimit(context.getLimit()); - } - scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest); - } - - @Override - public Iterator<GTRecord> iterator() { - return scanner.iterator(); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override - public GTInfo getInfo() { - return scanRequest == null ? null : scanRequest.getInfo(); - } - - @Override - public int getScannedRowCount() { - return scanner.getScannedRowCount(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index cec4e2f..f9c9a2b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -18,358 +18,27 @@ package org.apache.kylin.storage.hbase.cube.v2; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.PartitionDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.IStorageQuery; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.translate.DerivedFilterTranslator; +import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -public class CubeStorageQuery implements IStorageQuery { +public class CubeStorageQuery extends GTCubeStorageQueryBase { private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class); - private final CubeInstance cubeInstance; - private final CubeDesc cubeDesc; - public CubeStorageQuery(CubeInstance cube) { - this.cubeInstance = cube; - this.cubeDesc = cube.getDescriptor(); + super(cube); } @Override - public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - // allow custom measures hack - notifyBeforeStorageQuery(sqlDigest); - - Collection<TblColRef> groups = sqlDigest.groupbyColumns; - TupleFilter filter = sqlDigest.filter; - - // build dimension & metrics - Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>(); - Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>(); - buildDimensionsAndMetrics(sqlDigest, dimensions, metrics); - - // all dimensions = groups + other(like filter) dimensions - Set<TblColRef> otherDims = Sets.newHashSet(dimensions); - otherDims.removeAll(groups); - - // expand derived (xxxD means contains host columns only, derived columns were translated) - Set<TblColRef> derivedPostAggregation = Sets.newHashSet(); - Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation); - Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation); - otherDimsD.removeAll(groupsD); - - // identify cuboid - Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); - dimensionsD.addAll(groupsD); - dimensionsD.addAll(otherDimsD); - Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics); - context.setCuboid(cuboid); - - // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine - Set<TblColRef> singleValuesD = findSingleValueColumns(filter); - boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation); - context.setExactAggregation(isExactAggregation); - - // replace derived columns in filter with host columns; columns on loosened condition must be added to group by - TupleFilter filterD = translateDerived(filter, groupsD); - - setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory - setLimit(filter, context); - - List<CubeSegmentScanner> scanners = Lists.newArrayList(); - for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { - CubeSegmentScanner scanner; - if (cubeSeg.getInputRecords() == 0) { - logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg); - } - scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context); - scanners.add(scanner); - } - - if (scanners.isEmpty()) - return ITupleIterator.EMPTY_TUPLE_ITERATOR; - - return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); - } - - private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - for (FunctionDesc func : sqlDigest.aggregations) { - if (!func.isDimensionAsMetric()) { - // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision - metrics.add(findAggrFuncFromCubeDesc(func)); - } - } - - for (TblColRef column : sqlDigest.allColumns) { - // skip measure columns - if (sqlDigest.metricColumns.contains(column) && !(sqlDigest.groupbyColumns.contains(column) || sqlDigest.filterColumns.contains(column))) { - continue; - } - - dimensions.add(column); - } - } - - private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) { - for (MeasureDesc measure : cubeDesc.getMeasures()) { - if (measure.getFunction().equals(aggrFunc)) - return measure.getFunction(); - } - return aggrFunc; - } - - private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) { - Set<TblColRef> expanded = Sets.newHashSet(); - for (TblColRef col : cols) { - if (cubeDesc.hasHostColumn(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - for (TblColRef hostCol : hostInfo.columns) { - expanded.add(hostCol); - if (hostInfo.isOneToOne == false) - derivedPostAggregation.add(hostCol); - } - } else { - expanded.add(col); - } - } - return expanded; - } - - @SuppressWarnings("unchecked") - private Set<TblColRef> findSingleValueColumns(TupleFilter filter) { - Collection<? extends TupleFilter> toCheck; - if (filter instanceof CompareTupleFilter) { - toCheck = Collections.singleton(filter); - } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) { - toCheck = filter.getChildren(); + protected String getGTStorage() { + if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { + return "org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC"; } else { - return (Set<TblColRef>) Collections.EMPTY_SET; - } - - Set<TblColRef> result = Sets.newHashSet(); - for (TupleFilter f : toCheck) { - if (f instanceof CompareTupleFilter) { - CompareTupleFilter compFilter = (CompareTupleFilter) f; - // is COL=const ? - if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) { - result.add(compFilter.getColumn()); - } - } - } - - // expand derived - Set<TblColRef> resultD = Sets.newHashSet(); - for (TblColRef col : result) { - if (cubeDesc.isExtendedColumn(col)) { - throw new CubeDesc.CannotFilterExtendedColumnException(col); - } - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - if (hostInfo.isOneToOne) { - for (TblColRef hostCol : hostInfo.columns) { - resultD.add(hostCol); - } - } - //if not one2one, it will be pruned - } else { - resultD.add(col); - } + return KylinConfig.getInstanceFromEnv().getDefaultIGTStorage(); } - return resultD; } - - private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) { - boolean exact = true; - - if (cuboid.requirePostAggregation()) { - exact = false; - logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId()); - } - - // derived aggregation is bad, unless expanded columns are already in group by - if (groups.containsAll(derivedPostAggregation) == false) { - exact = false; - logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation); - } - - // other columns (from filter) is bad, unless they are ensured to have single value - if (singleValuesD.containsAll(othersD) == false) { - exact = false; - logger.info("exactAggregation is false because some column not on group by: " + othersD // - + " (single value column: " + singleValuesD + ")"); - } - - // for partitioned cube, the partition column must belong to group by or has single value - PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc(); - if (partDesc.isPartitioned()) { - TblColRef col = partDesc.getPartitionDateColumnRef(); - if (!groups.contains(col) && !singleValuesD.contains(col)) { - exact = false; - logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by"); - } - } - - if (exact) { - logger.info("exactAggregation is true"); - } - return exact; - } - - @SuppressWarnings("unchecked") - private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) { - if (filter == null) - return filter; - - if (filter instanceof CompareTupleFilter) { - return translateDerivedInCompare((CompareTupleFilter) filter, collector); - } - - List<TupleFilter> children = (List<TupleFilter>) filter.getChildren(); - List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size()); - boolean modified = false; - for (TupleFilter child : children) { - TupleFilter translated = translateDerived(child, collector); - newChildren.add(translated); - if (child != translated) - modified = true; - } - if (modified) { - filter = replaceChildren(filter, newChildren); - } - return filter; - } - - private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) { - if (filter instanceof LogicalTupleFilter) { - LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator()); - r.addChildren(newChildren); - return r; - } else - throw new IllegalStateException("Cannot replaceChildren on " + filter); - } - - private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) { - if (compf.getColumn() == null || compf.getValues().isEmpty()) - return compf; - - TblColRef derived = compf.getColumn(); - if (cubeDesc.isExtendedColumn(derived)) { - throw new CubeDesc.CannotFilterExtendedColumnException(derived); - } - if (cubeDesc.isDerived(derived) == false) - return compf; - - DeriveInfo hostInfo = cubeDesc.getHostInfo(derived); - CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig()); - CubeSegment seg = cubeInstance.getLatestReadySegment(); - LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension); - Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf); - TupleFilter translatedFilter = translated.getFirst(); - boolean loosened = translated.getSecond(); - if (loosened) { - collectColumnsRecursively(translatedFilter, collector); - } - return translatedFilter; - } - - private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) { - if (filter == null) - return; - - if (filter instanceof ColumnTupleFilter) { - collectColumns(((ColumnTupleFilter) filter).getColumn(), collector); - } - for (TupleFilter child : filter.getChildren()) { - collectColumnsRecursively(child, collector); - } - } - - private void collectColumns(TblColRef col, Set<TblColRef> collector) { - if (cubeDesc.isExtendedColumn(col)) { - throw new CubeDesc.CannotFilterExtendedColumnException(col); - } - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - for (TblColRef h : hostInfo.columns) - collector.add(h); - } else { - collector.add(col); - } - } - - private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) { - boolean hasMemHungryMeasure = false; - for (FunctionDesc func : metrics) { - hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry(); - } - - // need to limit the memory usage for memory hungry measures - if (hasMemHungryMeasure == false) { - return; - } - - int rowSizeEst = dimensions.size() * 3; - for (FunctionDesc func : metrics) { - // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage) - rowSizeEst += func.getReturnDataType().getStorageBytesEstimate(); - } - - long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst; - if (rowEst > 0) { - logger.info("Memory budget is set to " + rowEst + " rows"); - context.setThreshold((int) rowEst); - } else { - logger.info("Memory budget is not set."); - } - } - - private void setLimit(TupleFilter filter, StorageContext context) { - boolean goodAggr = context.isExactAggregation(); - boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled()); - boolean goodSort = context.hasSort() == false; - if (goodAggr && goodFilter && goodSort) { - logger.info("Enable limit " + context.getLimit()); - context.enableLimit(); - } - } - - private void notifyBeforeStorageQuery(SQLDigest sqlDigest) { - for (MeasureDesc measure : cubeDesc.getMeasures()) { - MeasureType<?> measureType = measure.getFunction().getMeasureType(); - measureType.adjustSqlDigest(measure, sqlDigest); - } - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java deleted file mode 100644 index a7346af..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v2; - -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * convert GTRecord to tuple - */ -public class CubeTupleConverter { - - final CubeSegment cubeSeg; - final Cuboid cuboid; - final TupleInfo tupleInfo; - final List<IDerivedColumnFiller> derivedColFillers; - - final int[] gtColIdx; - final int[] tupleIdx; - final Object[] gtValues; - final MeasureType<?>[] measureTypes; - - final List<IAdvMeasureFiller> advMeasureFillers; - final List<Integer> advMeasureIndexInGTValues; - - final int nSelectedDims; - - public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // - Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) { - this.cubeSeg = cubeSeg; - this.cuboid = cuboid; - this.tupleInfo = returnTupleInfo; - this.derivedColFillers = Lists.newArrayList(); - - List<TblColRef> cuboidDims = cuboid.getColumns(); - CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); - - nSelectedDims = selectedDimensions.size(); - gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; - tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; - gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()]; - - // measure types don't have this many, but aligned length make programming easier - measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()]; - - advMeasureFillers = Lists.newArrayListWithCapacity(1); - advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1); - - int iii = 0; - - // pre-calculate dimension index mapping to tuple - for (TblColRef dim : selectedDimensions) { - int i = mapping.getIndexOf(dim); - gtColIdx[iii] = i; - tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; - iii++; - } - - for (FunctionDesc metric : selectedMetrics) { - int i = mapping.getIndexOf(metric); - gtColIdx[iii] = i; - - if (metric.needRewrite()) { - String rewriteFieldName = metric.getRewriteFieldName(); - tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; - } - // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column - else { - TblColRef col = metric.getParameter().getColRefs().get(0); - tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - } - - MeasureType<?> measureType = metric.getMeasureType(); - if (measureType.needAdvancedTupleFilling()) { - Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric)); - advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap)); - advMeasureIndexInGTValues.add(iii); - } else { - measureTypes[iii] = measureType; - } - - iii++; - } - - // prepare derived columns and filler - Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null); - for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) { - TblColRef[] hostCols = entry.getKey().data; - for (DeriveInfo deriveInfo : entry.getValue()) { - IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo); - if (filler != null) { - derivedColFillers.add(filler); - } - } - } - } - - // load only needed dictionaries - private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) { - Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); - for (TblColRef col : columnsNeedDictionary) { - result.put(col, cubeSeg.getDictionary(col)); - } - return result; - } - - public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) { - - record.getValues(gtColIdx, gtValues); - - // dimensions - for (int i = 0; i < nSelectedDims; i++) { - int ti = tupleIdx[i]; - if (ti >= 0) { - tuple.setDimensionValue(ti, toString(gtValues[i])); - } - } - - // measures - for (int i = nSelectedDims; i < gtColIdx.length; i++) { - int ti = tupleIdx[i]; - if (ti >= 0 && measureTypes[i] != null) { - measureTypes[i].fillTupleSimply(tuple, ti, gtValues[i]); - } - } - - // derived - for (IDerivedColumnFiller filler : derivedColFillers) { - filler.fillDerivedColumns(gtValues, tuple); - } - - // advanced measure filling, due to possible row split, will complete at caller side - if (advMeasureFillers.isEmpty()) { - return null; - } else { - for (int i = 0; i < advMeasureFillers.size(); i++) { - Object measureValue = gtValues[advMeasureIndexInGTValues.get(i)]; - advMeasureFillers.get(i).reload(measureValue); - } - return advMeasureFillers; - } - } - - private interface IDerivedColumnFiller { - public void fillDerivedColumns(Object[] gtValues, Tuple tuple); - } - - private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) { - boolean allHostsPresent = true; - final int[] hostTmpIdx = new int[hostCols.length]; - for (int i = 0; i < hostCols.length; i++) { - hostTmpIdx[i] = indexOnTheGTValues(hostCols[i]); - allHostsPresent = allHostsPresent && hostTmpIdx[i] >= 0; - } - - boolean needCopyDerived = false; - final int[] derivedTupleIdx = new int[deriveInfo.columns.length]; - for (int i = 0; i < deriveInfo.columns.length; i++) { - TblColRef col = deriveInfo.columns[i]; - derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0; - } - - if ((allHostsPresent && needCopyDerived) == false) - return null; - - switch (deriveInfo.type) { - case LOOKUP: - return new IDerivedColumnFiller() { - CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig()); - LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension); - int[] derivedColIdx = initDerivedColIdx(); - Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]); - - private int[] initDerivedColIdx() { - int[] idx = new int[deriveInfo.columns.length]; - for (int i = 0; i < idx.length; i++) { - idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex(); - } - return idx; - } - - @Override - public void fillDerivedColumns(Object[] gtValues, Tuple tuple) { - for (int i = 0; i < hostTmpIdx.length; i++) { - lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]); - } - - String[] lookupRow = lookupTable.getRow(lookupKey); - - if (lookupRow != null) { - for (int i = 0; i < derivedTupleIdx.length; i++) { - if (derivedTupleIdx[i] >= 0) { - String value = lookupRow[derivedColIdx[i]]; - tuple.setDimensionValue(derivedTupleIdx[i], value); - } - } - } else { - for (int i = 0; i < derivedTupleIdx.length; i++) { - if (derivedTupleIdx[i] >= 0) { - tuple.setDimensionValue(derivedTupleIdx[i], null); - } - } - } - } - }; - case PK_FK: - return new IDerivedColumnFiller() { - @Override - public void fillDerivedColumns(Object[] gtValues, Tuple tuple) { - // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns() - tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]])); - } - }; - default: - throw new IllegalArgumentException(); - } - } - - private int indexOnTheGTValues(TblColRef col) { - List<TblColRef> cuboidDims = cuboid.getColumns(); - int cuboidIdx = cuboidDims.indexOf(col); - for (int i = 0; i < gtColIdx.length; i++) { - if (gtColIdx[i] == cuboidIdx) - return i; - } - return -1; - } - - private static String toString(Object o) { - return o == null ? null : o.toString(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java deleted file mode 100644 index f8b055c..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v2; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.ITuple; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.exception.ScanOutOfLimitException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SequentialCubeTupleIterator implements ITupleIterator { - - private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class); - - protected final Cuboid cuboid; - protected final Set<TblColRef> selectedDimensions; - protected final Set<FunctionDesc> selectedMetrics; - protected final TupleInfo tupleInfo; - protected final Tuple tuple; - protected final Iterator<CubeSegmentScanner> scannerIterator; - protected final StorageContext context; - - protected CubeSegmentScanner curScanner; - protected Iterator<GTRecord> curRecordIterator; - protected CubeTupleConverter curTupleConverter; - protected Tuple next; - - private List<IAdvMeasureFiller> advMeasureFillers; - private int advMeasureRowsRemaining; - private int advMeasureRowIndex; - - private int scanCount; - private int scanCountDelta; - - public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // - Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) { - this.cuboid = cuboid; - this.selectedDimensions = selectedDimensions; - this.selectedMetrics = selectedMetrics; - this.tupleInfo = returnTupleInfo; - this.tuple = new Tuple(returnTupleInfo); - this.scannerIterator = scanners.iterator(); - this.context = context; - } - - @Override - public boolean hasNext() { - if (next != null) - return true; - - if (hitLimitAndThreshold()) - return false; - - // consume any left rows from advanced measure filler - if (advMeasureRowsRemaining > 0) { - for (IAdvMeasureFiller filler : advMeasureFillers) { - filler.fillTuple(tuple, advMeasureRowIndex); - } - advMeasureRowIndex++; - advMeasureRowsRemaining--; - next = tuple; - return true; - } - - // get the next GTRecord - if (curScanner == null) { - if (scannerIterator.hasNext()) { - curScanner = scannerIterator.next(); - curRecordIterator = curScanner.iterator(); - if (curRecordIterator.hasNext()) { - //if the segment does not has any tuples, don't bother to create a converter - curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); - } - } else { - return false; - } - } - if (curRecordIterator.hasNext() == false) { - close(curScanner); - curScanner = null; - curRecordIterator = null; - curTupleConverter = null; - return hasNext(); - } - - // now we have a GTRecord - GTRecord curRecord = curRecordIterator.next(); - - // translate into tuple - advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple); - - // the simple case - if (advMeasureFillers == null) { - next = tuple; - return true; - } - - // advanced measure filling, like TopN, will produce multiple tuples out of one record - advMeasureRowsRemaining = -1; - for (IAdvMeasureFiller filler : advMeasureFillers) { - if (advMeasureRowsRemaining < 0) - advMeasureRowsRemaining = filler.getNumOfRows(); - if (advMeasureRowsRemaining != filler.getNumOfRows()) - throw new IllegalStateException(); - } - if (advMeasureRowsRemaining < 0) - throw new IllegalStateException(); - - advMeasureRowIndex = 0; - return hasNext(); - } - - - private boolean hitLimitAndThreshold() { - // check limit - if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) { - return true; - } - // check threshold - if (scanCount >= context.getThreshold()) { - throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause."); - } - return false; - } - - @Override - public ITuple next() { - // fetch next record - if (next == null) { - hasNext(); - if (next == null) - throw new NoSuchElementException(); - } - - scanCount++; - if (++scanCountDelta >= 1000) - flushScanCountDelta(); - - ITuple result = next; - next = null; - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - // hasNext() loop may exit because of limit, threshold, etc. - // close all the remaining segmentIterator - flushScanCountDelta(); - - if (curScanner != null) - close(curScanner); - - while (scannerIterator.hasNext()) { - close(scannerIterator.next()); - } - } - - protected void close(CubeSegmentScanner scanner) { - try { - scanner.close(); - } catch (IOException e) { - logger.error("Exception when close CubeScanner", e); - } - } - - public int getScanCount() { - return scanCount; - } - - private void flushScanCountDelta() { - context.increaseTotalScanCount(scanCountDelta); - scanCountDelta = 0; - } - -}