KYLIN-2438 replace scan threshold with max scan bytes
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/09a08668 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/09a08668 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/09a08668 Branch: refs/heads/KYLIN-2428 Commit: 09a086688a664585c57b715046a9869b75351a52 Parents: edf6cef Author: gaodayue <gaoda...@meituan.com> Authored: Thu Feb 9 20:18:54 2017 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Fri Feb 10 18:07:58 2017 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 16 ++- .../apache/kylin/common/KylinConfigBase.java | 15 ++- .../org/apache/kylin/common/QueryContext.java | 54 ++++---- .../kylin-backward-compatibility.properties | 8 +- .../apache/kylin/storage/StorageContext.java | 14 --- .../exception/ScanOutOfLimitException.java | 31 ----- .../storage/gtrecord/CubeScanRangePlanner.java | 4 +- .../gtrecord/GTCubeStorageQueryBase.java | 28 ----- .../gtrecord/SequentialCubeTupleIterator.java | 15 +-- .../org/apache/kylin/query/KylinTestBase.java | 4 +- .../kylin/storage/hbase/ITStorageTest.java | 12 -- .../kylin/query/enumerator/OLAPEnumerator.java | 18 --- .../kylin/query/enumerator/OLAPQuery.java | 2 - .../apache/kylin/rest/service/CacheService.java | 6 - .../apache/kylin/rest/service/QueryService.java | 9 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 12 +- .../coprocessor/endpoint/CubeVisitService.java | 4 +- .../endpoint/generated/CubeVisitProtos.java | 124 ++++++++++++++++--- .../endpoint/protobuf/CubeVisit.proto | 1 + 19 files changed, 186 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 1232c47..095a53f 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -86,6 +86,14 @@ kylin.storage.hbase.owner-tag=who...@kylin.apache.org kylin.storage.hbase.coprocessor-mem-gb=3 +# By default kylin can spill query's intermediate results to disks when it's consuming too much memory. +# Set it to false if you want query to abort immediately in such condition. +kylin.storage.hbase.coprocessor-spill-enabled=true + +# The maximum number of bytes each coprocessor is allowed to scan. +# To allow arbitrary large scan, you can set it to 0. +kylin.storage.hbase.coprocessor-max-scan-bytes=3221225472 + # The default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds, # You can set it to a smaller value. 0 means use default. # kylin.storage.hbase.coprocessor-timeout-seconds=0 @@ -148,13 +156,13 @@ kylin.snapshot.max-mb=300 ### QUERY ### -kylin.query.scan-threshold=10000000 +# Controls the maximum number of bytes a query is allowed to scan storage. +# The default value 0 means no limit. +# The counterpart kylin.storage.hbase.coprocessor-max-scan-bytes sets the maximum per coprocessor. +kylin.query.max-scan-bytes=0 kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF -# 3G -kylin.query.memory-budget-bytes=3221225472 - kylin.query.cache-enabled=true http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 894e28a..c77788b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -613,6 +613,11 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(this.getOptional("kylin.storage.hbase.coprocessor-spill-enabled", "true")); } + public long getQueryCoprocessorMaxScanBytes() { + long value = Long.parseLong(this.getOptional("kylin.storage.hbase.coprocessor-max-scan-bytes", String.valueOf(3L * 1024 * 1024 * 1024))); + return value > 0 ? value : Long.MAX_VALUE; + } + public int getQueryCoprocessorTimeoutSeconds() { return Integer.parseInt(this.getOptional("kylin.storage.hbase.coprocessor-timeout-seconds", "0")); } @@ -807,10 +812,16 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000")); } + @Deprecated public int getScanThreshold() { return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000")); } + public long getQueryMaxScanBytes() { + long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0")); + return value > 0 ? value : Long.MAX_VALUE; + } + public int getLargeQueryThreshold() { return Integer.parseInt(getOptional("kylin.query.large-query-threshold", String.valueOf((int) (getScanThreshold() * 0.1)))); } @@ -851,10 +862,6 @@ abstract public class KylinConfigBase implements Serializable { return Long.parseLong(this.getOptional("kylin.query.cache-threshold-scan-count", String.valueOf(10 * 1024))); } - public long getQueryMemBudget() { - return Long.parseLong(this.getOptional("kylin.query.memory-budget-bytes", String.valueOf(3L * 1024 * 1024 * 1024))); - } - public boolean isQuerySecureEnabled() { return Boolean.parseBoolean(this.getOptional("kylin.query.security-enabled", "true")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-common/src/main/java/org/apache/kylin/common/QueryContext.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 93b8556..3a73993 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -18,44 +18,48 @@ package org.apache.kylin.common; -import java.util.Map; - -import com.google.common.collect.Maps; +import java.util.concurrent.atomic.AtomicLong; /** * checkout {@link org.apache.kylin.common.debug.BackdoorToggles} for comparison */ public class QueryContext { - private static final ThreadLocal<Map<String, String>> _queryContext = new ThreadLocal<Map<String, String>>(); - public final static String KEY_QUERY_ID = "QUERY_ID"; + private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() { + @Override + protected QueryContext initialValue() { + return new QueryContext(); + } + }; + + private String queryId; + private AtomicLong scanBytes = new AtomicLong(); - public static String getQueryId() { - return getString(KEY_QUERY_ID); + private QueryContext() { + // use QueryContext.current() instead } - public static void setQueryId(String uuid) { - setString(KEY_QUERY_ID, uuid); + public static QueryContext current() { + return contexts.get(); } - private static void setString(String key, String value) { - Map<String, String> context = _queryContext.get(); - if (context == null) { - Map<String, String> newMap = Maps.newHashMap(); - newMap.put(key, value); - _queryContext.set(newMap); - } else { - context.put(key, value); - } + public static void reset() { + contexts.remove(); } - private static String getString(String key) { - Map<String, String> context = _queryContext.get(); - if (context == null) { - return null; - } else { - return context.get(key); - } + public String getQueryId() { + return queryId; } + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public long getScanBytes() { + return scanBytes.get(); + } + + public long addAndGetScanBytes(long delta) { + return scanBytes.addAndGet(delta); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-common/src/main/resources/kylin-backward-compatibility.properties ---------------------------------------------------------------------- diff --git a/core-common/src/main/resources/kylin-backward-compatibility.properties b/core-common/src/main/resources/kylin-backward-compatibility.properties index 16871d8..ab9bcb1 100644 --- a/core-common/src/main/resources/kylin-backward-compatibility.properties +++ b/core-common/src/main/resources/kylin-backward-compatibility.properties @@ -142,8 +142,6 @@ kylin.query.hbase.hconnection.threads.alive.seconds=kylin.storage.hbase.hconnect ### QUERY ### -kylin.query.pushdown.limit.max=kylin.query.max-limit-pushdown -kylin.query.scan.threshold=kylin.query.scan-threshold kylin.query.filter.derived_in.max=kylin.query.derived-filter-translation-threshold kylin.query.badquery.stacktrace.depth=kylin.query.badquery-stacktrace-depth kylin.query.badquery.history.num=kylin.query.badquery-history-number @@ -154,13 +152,17 @@ kylin.query.transformers=kylin.query.transformers kylin.query.cache.enabled=kylin.query.cache-enabled kylin.query.cache.threshold.duration=kylin.query.cache-threshold-duration kylin.query.cache.threshold.scancount=kylin.query.cache-threshold-scan-count -kylin.query.mem.budget=kylin.query.memory-budget-bytes +kylin.query.mem.budget=kylin.storage.hbase.coprocessor-max-scan-bytes kylin.query.ignore_unknown_function=kylin.query.ignore-unknown-function kylin.query.dim.distinct.max=kylin.query.max-dimension-count-distinct kylin.query.security.enabled=kylin.query.security-enabled kylin.query.access.controller=kylin.query.access-controller kylin.query.udf.=kylin.query.udf. +#deprecated +kylin.query.pushdown.limit.max=kylin.query.max-limit-pushdown +kylin.query.scan.threshold=kylin.query.scan-threshold + ### SERVER ### http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 708dfde..0f52c53 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -20,7 +20,6 @@ package org.apache.kylin.storage; import java.util.concurrent.atomic.AtomicLong; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.realization.IRealization; import org.slf4j.Logger; @@ -35,7 +34,6 @@ public class StorageContext { private static final Logger logger = LoggerFactory.getLogger(StorageContext.class); private String connUrl; - private int threshold; private int limit = Integer.MAX_VALUE; private int offset = 0; private int finalPushDownLimit = Integer.MAX_VALUE; @@ -54,10 +52,6 @@ public class StorageContext { private Cuboid cuboid; private boolean partialResultReturned = false; - public StorageContext() { - this.threshold = KylinConfig.getInstanceFromEnv().getScanThreshold(); - } - private Range<Long> reusedPeriod; public String getConnUrl() { @@ -68,14 +62,6 @@ public class StorageContext { this.connUrl = connUrl; } - public int getThreshold() { - return threshold; - } - - public void setThreshold(int t) { - threshold = t; - } - public int getLimit() { return limit; } http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java b/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java deleted file mode 100644 index f77cc35..0000000 --- a/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.exception; - -/** - * @author ysong1 - * - */ -public class ScanOutOfLimitException extends RuntimeException { - private static final long serialVersionUID = 2045169570038227895L; - - public ScanOutOfLimitException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index b05a629..6911827 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -150,8 +150,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { if (scanRanges != null && scanRanges.size() != 0) { scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).// setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).// - setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()).// - setStoragePushDownLimit(context.getFinalPushDownLimit()).setStorageScanRowNumThreshold(context.getThreshold()).createGTScanRequest(); + setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB()).// + setStoragePushDownLimit(context.getFinalPushDownLimit()).createGTScanRequest(); } else { scanRequest = null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 4dbdf94..a72460c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -122,8 +122,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context); // set query deadline context.setDeadline(cubeInstance); - // set cautious threshold to prevent out of memory - setThresholdIfNecessary(dimensionsD, metrics, context); List<CubeSegmentScanner> scanners = Lists.newArrayList(); for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { @@ -323,32 +321,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } - private void setThresholdIfNecessary(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) { - boolean hasMemHungryMeasure = false; - for (FunctionDesc func : metrics) { - hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry(); - } - - // need to limit the memory usage for memory hungry measures - if (hasMemHungryMeasure == false) { - return; - } - - int rowSizeEst = dimensions.size() * 3; - for (FunctionDesc func : metrics) { - // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage) - rowSizeEst += func.getReturnDataType().getStorageBytesEstimate(); - } - - long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst; - if (rowEst > 0) { - logger.info("Memory budget is set to " + rowEst + " rows"); - context.setThreshold((int) rowEst); - } else { - logger.info("Memory budget is not set."); - } - } - private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) { boolean possible = true; http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index c6b2c6c..bb2d7f9 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -26,7 +26,6 @@ import java.util.Set; import javax.annotation.Nullable; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTScanTimeoutException; import org.apache.kylin.metadata.model.FunctionDesc; @@ -35,7 +34,6 @@ import org.apache.kylin.metadata.tuple.ITuple; import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +46,6 @@ public class SequentialCubeTupleIterator implements ITupleIterator { private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class); - private final int SCAN_THRESHOLD = KylinConfig.getInstanceFromEnv().getScanThreshold(); - protected List<CubeSegmentScanner> scanners; protected List<SegmentCubeTupleIterator> segmentCubeTupleIterators; protected Iterator<ITuple> tupleIterator; @@ -142,15 +138,10 @@ public class SequentialCubeTupleIterator implements ITupleIterator { @Override public ITuple next() { - if (scanCount % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) { + if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) { throw new GTScanTimeoutException("Query Timeout!"); } - // prevent the big query to make the Query Server OOM - if (scanCount++ > SCAN_THRESHOLD) { - throw new ScanOutOfLimitException("Scan count exceed the scan threshold: " + SCAN_THRESHOLD); - } - if (++scanCountDelta >= 1000) flushScanCountDelta(); @@ -181,10 +172,6 @@ public class SequentialCubeTupleIterator implements ITupleIterator { } } - public int getScanCount() { - return scanCount; - } - private void flushScanCountDelta() { context.increaseTotalScanCount(scanCountDelta); scanCountDelta = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 6b9397d..d83ad75 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -39,7 +39,6 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.logging.LogManager; @@ -642,8 +641,7 @@ public class KylinTestBase { //setup cube conn File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config); - Properties props = new Properties(); - cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props); + cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath()); //setup h2 h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa", ""); http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java index aea8bef..733ca06 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java @@ -39,13 +39,11 @@ import org.apache.kylin.storage.IStorageQuery; import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.StorageFactory; import org.apache.kylin.storage.StorageMockUtils; -import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.Sets; @@ -84,16 +82,6 @@ public class ITStorageTest extends HBaseMetadataTestCase { this.cleanupTestMetadata(); } - @Test(expected = ScanOutOfLimitException.class) - @Ignore - public void testScanOutOfLimit() { - context.setThreshold(1); - List<TblColRef> groups = mockup.buildGroups(); - List<FunctionDesc> aggregations = mockup.buildAggregations(); - - search(groups, aggregations, null, context); - } - @Test public void test01() { List<TblColRef> groups = mockup.buildGroups(); http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java index f012638..56b82b9 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java @@ -20,12 +20,9 @@ package org.apache.kylin.query.enumerator; import java.util.Arrays; import java.util.Map; -import java.util.Properties; import org.apache.calcite.DataContext; -import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.linq4j.Enumerator; -import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; @@ -111,9 +108,6 @@ public class OLAPEnumerator implements Enumerator<Object[]> { private ITupleIterator queryStorage() { logger.debug("query storage..."); - // set connection properties - setConnectionProperties(); - // bind dynamic variables bindVariable(olapContext.filter); @@ -156,16 +150,4 @@ public class OLAPEnumerator implements Enumerator<Object[]> { } } } - - private void setConnectionProperties() { - CalciteConnection conn = (CalciteConnection) optiqContext.getQueryProvider(); - Properties connProps = conn.getProperties(); - - String propThreshold = connProps.getProperty(OLAPQuery.PROP_SCAN_THRESHOLD); - if (!StringUtils.isBlank(propThreshold)) { - int threshold = Integer.valueOf(propThreshold); - olapContext.storageContext.setThreshold(threshold); - } - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java index 27d8c94..8318a07 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java @@ -28,8 +28,6 @@ import org.apache.kylin.query.relnode.OLAPContext; */ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerable<Object[]> { - public static final String PROP_SCAN_THRESHOLD = "scan_threshold"; - public enum EnumeratorTypeEnum { OLAP, //finish query with Cube or II, or a combination of both LOOKUP_TABLE, //using a snapshot of lookup table http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java index 0938e95..af680a5 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -21,7 +21,6 @@ package org.apache.kylin.rest.service; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; -import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -30,11 +29,9 @@ import javax.sql.DataSource; import org.apache.calcite.jdbc.Driver; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.schema.OLAPSchemaFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,9 +163,6 @@ public class CacheService extends BasicService { } DriverManagerDataSource ds = new DriverManagerDataSource(); - Properties props = new Properties(); - props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold())); - ds.setConnectionProperties(props); ds.setDriverClassName(Driver.class.getName()); ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 9ccda03..7d9e24d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -63,6 +63,7 @@ import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.gridtable.GTScanExceedThresholdException; import org.apache.kylin.metadata.project.RealizationEntry; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.relnode.OLAPContext; @@ -79,7 +80,6 @@ import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.QueryUtil; import org.apache.kylin.rest.util.Serializer; import org.apache.kylin.rest.util.TableauInterceptor; -import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -267,7 +267,7 @@ public class QueryService extends BasicService { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(newLine); stringBuilder.append("==========================[QUERY]===============================").append(newLine); - stringBuilder.append("Query Id: ").append(QueryContext.getQueryId()).append(newLine); + stringBuilder.append("Query Id: ").append(QueryContext.current().getQueryId()).append(newLine); stringBuilder.append("SQL: ").append(request.getSql()).append(newLine); stringBuilder.append("User: ").append(user).append(newLine); stringBuilder.append("Success: ").append((null == response.getExceptionMessage())).append(newLine); @@ -331,7 +331,7 @@ public class QueryService extends BasicService { final String queryId = UUID.randomUUID().toString(); if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); - QueryContext.setQueryId(queryId); + QueryContext.current().setQueryId(queryId); try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { String sql = sqlRequest.getSql(); @@ -383,7 +383,7 @@ public class QueryService extends BasicService { sqlResponse = new SQLResponse(null, null, 0, true, errMsg); // for exception queries, only cache ScanOutOfLimitException - if (queryCacheEnabled && e instanceof ScanOutOfLimitException) { + if (queryCacheEnabled && e instanceof GTScanExceedThresholdException) { Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); exceptionCache.put(new Element(sqlRequest, sqlResponse)); } @@ -400,6 +400,7 @@ public class QueryService extends BasicService { } finally { BackdoorToggles.cleanToggles(); + QueryContext.reset(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index a2b2611..68a84c1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -43,6 +43,7 @@ import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTScanExceedThresholdException; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; @@ -106,6 +107,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @SuppressWarnings("checkstyle:methodlength") @Override public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { + final QueryContext queryContext = QueryContext.current(); Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard(); short shardNum = shardNumAndBaseShard.getFirst(); @@ -160,11 +162,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); builder.setKylinProperties(kylinConfig.getConfigAsString()); - final String queryId = QueryContext.getQueryId(); + final String queryId = queryContext.getQueryId(); if (queryId != null) { builder.setQueryId(queryId); } builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled()); + builder.setMaxScanBytes(cubeSeg.getConfig().getQueryCoprocessorMaxScanBytes()); for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { executorService.submit(new Runnable() { @@ -199,10 +202,15 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { if (region == null) return; - context.increaseTotalScanBytes(result.getStats().getScannedBytes()); + final long scanBytes = result.getStats().getScannedBytes(); + context.increaseTotalScanBytes(scanBytes); totalScannedCount.addAndGet(result.getStats().getScannedRowCount()); logger.info(logHeader + getStatsString(region, result)); + if (queryContext.addAndGetScanBytes(scanBytes) > cubeSeg.getConfig().getQueryMaxScanBytes()) { + throw new GTScanExceedThresholdException("Query scanned " + queryContext.getScanBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes()); + } + if (result.getStats().getNormalComplete() != 1) { abnormalFinish[0] = true; return; http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 1f6425f..5fd9740 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -297,8 +297,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator( allCellLists, - scanReq.getStorageScanRowNumThreshold(), - Long.MAX_VALUE, + scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold) + request.getMaxScanBytes() == 0 ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client scanReq.getTimeout()); IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn()); http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java index 5a3aa5a..4b6fc95 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java @@ -117,6 +117,16 @@ public final class CubeVisitProtos { * <code>optional bool spillEnabled = 7 [default = true];</code> */ boolean getSpillEnabled(); + + // optional int64 maxScanBytes = 8; + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + boolean hasMaxScanBytes(); + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + long getMaxScanBytes(); } /** * Protobuf type {@code CubeVisitRequest} @@ -207,6 +217,11 @@ public final class CubeVisitProtos { spillEnabled_ = input.readBool(); break; } + case 64: { + bitField0_ |= 0x00000040; + maxScanBytes_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -952,6 +967,22 @@ public final class CubeVisitProtos { return spillEnabled_; } + // optional int64 maxScanBytes = 8; + public static final int MAXSCANBYTES_FIELD_NUMBER = 8; + private long maxScanBytes_; + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + public boolean hasMaxScanBytes() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + public long getMaxScanBytes() { + return maxScanBytes_; + } + private void initFields() { gtScanRequest_ = com.google.protobuf.ByteString.EMPTY; hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY; @@ -960,6 +991,7 @@ public final class CubeVisitProtos { kylinProperties_ = ""; queryId_ = ""; spillEnabled_ = true; + maxScanBytes_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1010,6 +1042,9 @@ public final class CubeVisitProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeBool(7, spillEnabled_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeInt64(8, maxScanBytes_); + } getUnknownFields().writeTo(output); } @@ -1047,6 +1082,10 @@ public final class CubeVisitProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(7, spillEnabled_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(8, maxScanBytes_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1102,6 +1141,11 @@ public final class CubeVisitProtos { result = result && (getSpillEnabled() == other.getSpillEnabled()); } + result = result && (hasMaxScanBytes() == other.hasMaxScanBytes()); + if (hasMaxScanBytes()) { + result = result && (getMaxScanBytes() + == other.getMaxScanBytes()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1143,6 +1187,10 @@ public final class CubeVisitProtos { hash = (37 * hash) + SPILLENABLED_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getSpillEnabled()); } + if (hasMaxScanBytes()) { + hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMaxScanBytes()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1271,6 +1319,8 @@ public final class CubeVisitProtos { bitField0_ = (bitField0_ & ~0x00000020); spillEnabled_ = true; bitField0_ = (bitField0_ & ~0x00000040); + maxScanBytes_ = 0L; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -1332,6 +1382,10 @@ public final class CubeVisitProtos { to_bitField0_ |= 0x00000020; } result.spillEnabled_ = spillEnabled_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000040; + } + result.maxScanBytes_ = maxScanBytes_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1396,6 +1450,9 @@ public final class CubeVisitProtos { if (other.hasSpillEnabled()) { setSpillEnabled(other.getSpillEnabled()); } + if (other.hasMaxScanBytes()) { + setMaxScanBytes(other.getMaxScanBytes()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1989,6 +2046,39 @@ public final class CubeVisitProtos { return this; } + // optional int64 maxScanBytes = 8; + private long maxScanBytes_ ; + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + public boolean hasMaxScanBytes() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + public long getMaxScanBytes() { + return maxScanBytes_; + } + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + public Builder setMaxScanBytes(long value) { + bitField0_ |= 0x00000080; + maxScanBytes_ = value; + onChanged(); + return this; + } + /** + * <code>optional int64 maxScanBytes = 8;</code> + */ + public Builder clearMaxScanBytes() { + bitField0_ = (bitField0_ & ~0x00000080); + maxScanBytes_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CubeVisitRequest) } @@ -4433,27 +4523,27 @@ public final class CubeVisitProtos { java.lang.String[] descriptorData = { "\npstorage-hbase/src/main/java/org/apache" + "/kylin/storage/hbase/cube/v2/coprocessor" + - "/endpoint/protobuf/CubeVisit.proto\"\357\001\n\020C" + + "/endpoint/protobuf/CubeVisit.proto\"\205\002\n\020C" + "ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" + "\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" + "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" + "eVisitRequest.IntList\022\027\n\017kylinProperties" + "\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" + - "\007 \001(\010:\004true\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\347\002\n" + - "\021CubeVisitResponse\022\026\n\016compressedRows\030\001 \002", - "(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.St" + - "ats\032\220\002\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022" + - "\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scannedRowCou" + - "nt\030\003 \001(\003\022\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\r" + - "systemCpuLoad\030\005 \001(\001\022\036\n\026freePhysicalMemor" + - "ySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020" + - "\n\010hostname\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016norm" + - "alComplete\030\n \001(\005\022\024\n\014scannedBytes\030\013 \001(\0032F" + - "\n\020CubeVisitService\0222\n\tvisitCube\022\021.CubeVi" + - "sitRequest\032\022.CubeVisitResponseB`\nEorg.ap", - "ache.kylin.storage.hbase.cube.v2.coproce" + - "ssor.endpoint.generatedB\017CubeVisitProtos" + - "H\001\210\001\001\240\001\001" + "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\032\027\n\007Int" + + "List\022\014\n\004ints\030\001 \003(\005\"\347\002\n\021CubeVisitResponse", + "\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132" + + "\030.CubeVisitResponse.Stats\032\220\002\n\005Stats\022\030\n\020s" + + "erviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTime\030" + + "\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022\032\n\022aggreg" + + "atedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 \001(" + + "\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021fre" + + "eSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t\022\016" + + "\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005\022\024" + + "\n\014scannedBytes\030\013 \001(\0032F\n\020CubeVisitService" + + "\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeV", + "isitResponseB`\nEorg.apache.kylin.storage" + + ".hbase.cube.v2.coprocessor.endpoint.gene" + + "ratedB\017CubeVisitProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4465,7 +4555,7 @@ public final class CubeVisitProtos { internal_static_CubeVisitRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CubeVisitRequest_descriptor, - new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", }); + new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", }); internal_static_CubeVisitRequest_IntList_descriptor = internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0); internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto index f416669..00015fc 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto @@ -37,6 +37,7 @@ message CubeVisitRequest { required string kylinProperties = 5; // kylin properties optional string queryId = 6; optional bool spillEnabled = 7 [default = true]; + optional int64 maxScanBytes = 8; // 0 means no limit message IntList { repeated int32 ints = 1; }