This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 8ba2aa76eda7dc7f60f17ffe85cfed84e96c6411 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Fri Jun 19 15:51:18 2020 +0800 KYLIN-4586 Add a configuration of maximum partition returned bytes --- .../org/apache/kylin/common/KylinConfigBase.java | 6 + .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 1 + .../v2/coprocessor/endpoint/CubeVisitService.java | 6 + .../endpoint/generated/CubeVisitProtos.java | 212 ++++++++++++++++----- .../coprocessor/endpoint/protobuf/CubeVisit.proto | 4 +- 5 files changed, 185 insertions(+), 44 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 416800b..2ac703d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1416,6 +1416,12 @@ public abstract class KylinConfigBase implements Serializable { return value > 0 ? value : Long.MAX_VALUE; } + public long getPartitionMaxReturnBytes() { + long value = Long.parseLong( + this.getOptional("kylin.storage.partition.max-return-bytes", String.valueOf(3L * 1024 * 1024 * 1024))); + return value > 0 ? value : Long.MAX_VALUE; + } + public int getQueryCoprocessorTimeoutSeconds() { return Integer.parseInt(this.getOptional("kylin.storage.hbase.coprocessor-timeout-seconds", "0")); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 175bc37..c694c0d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -284,6 +284,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled()); builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes()); + builder.setMaxReturnBytes(cubeSeg.getConfig().getPartitionMaxReturnBytes()); builder.setIsExactAggregate(storageContext.isExactAggregation()); final String logHeader = String.format(Locale.ROOT, "<sub-thread for Query %s GTScanRequest %s>", diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index e39e352..450f6a9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -334,7 +334,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement long finalRowCount = 0L; try { + long maxReturnBytes = request.hasMaxReturnBytes() ? request.getMaxReturnBytes() : Long.MAX_VALUE; for (GTRecord oneRecord : finalScanner) { + int outputSize = outputStream.size(); + if (outputSize > maxReturnBytes) { + throw new ResourceLimitExceededException("return row size exceeds threshold " + outputSize); + } + buffer.clear(); try { oneRecord.exportColumns(scanReq.getColumns(), buffer); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java index 2da6020..1b96a75 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java @@ -136,13 +136,31 @@ public final class CubeVisitProtos { */ long getMaxScanBytes(); - // optional bool isExactAggregate = 9 [default = false]; + // optional int64 maxReturnBytes = 9; /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + boolean hasMaxReturnBytes(); + /** + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + long getMaxReturnBytes(); + + // optional bool isExactAggregate = 10 [default = false]; + /** + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ boolean hasIsExactAggregate(); /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ boolean getIsExactAggregate(); } @@ -242,6 +260,11 @@ public final class CubeVisitProtos { } case 72: { bitField0_ |= 0x00000080; + maxReturnBytes_ = input.readInt64(); + break; + } + case 80: { + bitField0_ |= 0x00000100; isExactAggregate_ = input.readBool(); break; } @@ -1012,17 +1035,41 @@ public final class CubeVisitProtos { return maxScanBytes_; } - // optional bool isExactAggregate = 9 [default = false]; - public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 9; + // optional int64 maxReturnBytes = 9; + public static final int MAXRETURNBYTES_FIELD_NUMBER = 9; + private long maxReturnBytes_; + /** + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + public boolean hasMaxReturnBytes() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + public long getMaxReturnBytes() { + return maxReturnBytes_; + } + + // optional bool isExactAggregate = 10 [default = false]; + public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 10; private boolean isExactAggregate_; /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ public boolean hasIsExactAggregate() { - return ((bitField0_ & 0x00000080) == 0x00000080); + return ((bitField0_ & 0x00000100) == 0x00000100); } /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ public boolean getIsExactAggregate() { return isExactAggregate_; @@ -1037,6 +1084,7 @@ public final class CubeVisitProtos { queryId_ = ""; spillEnabled_ = true; maxScanBytes_ = 0L; + maxReturnBytes_ = 0L; isExactAggregate_ = false; } private byte memoizedIsInitialized = -1; @@ -1092,7 +1140,10 @@ public final class CubeVisitProtos { output.writeInt64(8, maxScanBytes_); } if (((bitField0_ & 0x00000080) == 0x00000080)) { - output.writeBool(9, isExactAggregate_); + output.writeInt64(9, maxReturnBytes_); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeBool(10, isExactAggregate_); } getUnknownFields().writeTo(output); } @@ -1137,7 +1188,11 @@ public final class CubeVisitProtos { } if (((bitField0_ & 0x00000080) == 0x00000080)) { size += com.google.protobuf.CodedOutputStream - .computeBoolSize(9, isExactAggregate_); + .computeInt64Size(9, maxReturnBytes_); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(10, isExactAggregate_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -1199,6 +1254,11 @@ public final class CubeVisitProtos { result = result && (getMaxScanBytes() == other.getMaxScanBytes()); } + result = result && (hasMaxReturnBytes() == other.hasMaxReturnBytes()); + if (hasMaxReturnBytes()) { + result = result && (getMaxReturnBytes() + == other.getMaxReturnBytes()); + } result = result && (hasIsExactAggregate() == other.hasIsExactAggregate()); if (hasIsExactAggregate()) { result = result && (getIsExactAggregate() @@ -1249,6 +1309,10 @@ public final class CubeVisitProtos { hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER; hash = (53 * hash) + hashLong(getMaxScanBytes()); } + if (hasMaxReturnBytes()) { + hash = (37 * hash) + MAXRETURNBYTES_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMaxReturnBytes()); + } if (hasIsExactAggregate()) { hash = (37 * hash) + ISEXACTAGGREGATE_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getIsExactAggregate()); @@ -1383,8 +1447,10 @@ public final class CubeVisitProtos { bitField0_ = (bitField0_ & ~0x00000040); maxScanBytes_ = 0L; bitField0_ = (bitField0_ & ~0x00000080); - isExactAggregate_ = false; + maxReturnBytes_ = 0L; bitField0_ = (bitField0_ & ~0x00000100); + isExactAggregate_ = false; + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -1453,6 +1519,10 @@ public final class CubeVisitProtos { if (((from_bitField0_ & 0x00000100) == 0x00000100)) { to_bitField0_ |= 0x00000080; } + result.maxReturnBytes_ = maxReturnBytes_; + if (((from_bitField0_ & 0x00000200) == 0x00000200)) { + to_bitField0_ |= 0x00000100; + } result.isExactAggregate_ = isExactAggregate_; result.bitField0_ = to_bitField0_; onBuilt(); @@ -1521,6 +1591,9 @@ public final class CubeVisitProtos { if (other.hasMaxScanBytes()) { setMaxScanBytes(other.getMaxScanBytes()); } + if (other.hasMaxReturnBytes()) { + setMaxReturnBytes(other.getMaxReturnBytes()); + } if (other.hasIsExactAggregate()) { setIsExactAggregate(other.getIsExactAggregate()); } @@ -2166,34 +2239,83 @@ public final class CubeVisitProtos { return this; } - // optional bool isExactAggregate = 9 [default = false]; + // optional int64 maxReturnBytes = 9; + private long maxReturnBytes_ ; + /** + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + public boolean hasMaxReturnBytes() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + public long getMaxReturnBytes() { + return maxReturnBytes_; + } + /** + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + public Builder setMaxReturnBytes(long value) { + bitField0_ |= 0x00000100; + maxReturnBytes_ = value; + onChanged(); + return this; + } + /** + * <code>optional int64 maxReturnBytes = 9;</code> + * + * <pre> + * max return bytes + * </pre> + */ + public Builder clearMaxReturnBytes() { + bitField0_ = (bitField0_ & ~0x00000100); + maxReturnBytes_ = 0L; + onChanged(); + return this; + } + + // optional bool isExactAggregate = 10 [default = false]; private boolean isExactAggregate_ ; /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ public boolean hasIsExactAggregate() { - return ((bitField0_ & 0x00000100) == 0x00000100); + return ((bitField0_ & 0x00000200) == 0x00000200); } /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ public boolean getIsExactAggregate() { return isExactAggregate_; } /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ public Builder setIsExactAggregate(boolean value) { - bitField0_ |= 0x00000100; + bitField0_ |= 0x00000200; isExactAggregate_ = value; onChanged(); return this; } /** - * <code>optional bool isExactAggregate = 9 [default = false];</code> + * <code>optional bool isExactAggregate = 10 [default = false];</code> */ public Builder clearIsExactAggregate() { - bitField0_ = (bitField0_ & ~0x00000100); + bitField0_ = (bitField0_ & ~0x00000200); isExactAggregate_ = false; onChanged(); return this; @@ -3368,6 +3490,8 @@ public final class CubeVisitProtos { maybeForceBuilderInitialization(); } private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } } private static Builder create() { return new Builder(); @@ -4442,6 +4566,8 @@ public final class CubeVisitProtos { maybeForceBuilderInitialization(); } private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } } private static Builder create() { return new Builder(); @@ -5690,34 +5816,34 @@ public final class CubeVisitProtos { java.lang.String[] descriptorData = { "\npstorage-hbase/src/main/java/org/apache" + "/kylin/storage/hbase/cube/v2/coprocessor" + - "/endpoint/protobuf/CubeVisit.proto\"\246\002\n\020C" + + "/endpoint/protobuf/CubeVisit.proto\"\276\002\n\020C" + "ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" + "\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" + "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" + "eVisitRequest.IntList\022\027\n\017kylinProperties" + "\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" + - "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\022\037\n\020isE" + - "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n", - "\004ints\030\001 \003(\005\"\305\004\n\021CubeVisitResponse\022\026\n\016com" + - "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeV" + - "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." + - "CubeVisitResponse.ErrorInfo\032\252\002\n\005Stats\022\030\n" + - "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" + - "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022\032\n\022aggr" + - "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " + - "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" + - "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" + - "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005", - "\022\024\n\014scannedBytes\030\013 \001(\003\022\030\n\020filteredRowCou" + - "nt\030\014 \001(\003\032H\n\tErrorInfo\022*\n\004type\030\001 \002(\0162\034.Cu" + - "beVisitResponse.ErrorType\022\017\n\007message\030\002 \002" + - "(\t\"G\n\tErrorType\022\020\n\014UNKNOWN_TYPE\020\000\022\013\n\007TIM" + - "EOUT\020\001\022\033\n\027RESOURCE_LIMIT_EXCEEDED\020\0022F\n\020C" + - "ubeVisitService\0222\n\tvisitCube\022\021.CubeVisit" + - "Request\032\022.CubeVisitResponseB`\nEorg.apach" + - "e.kylin.storage.hbase.cube.v2.coprocesso" + - "r.endpoint.generatedB\017CubeVisitProtosH\001\210" + - "\001\001\240\001\001" + "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\022\026\n\016max" + + "ReturnBytes\030\t \001(\003\022\037\n\020isExactAggregate\030\n ", + "\001(\010:\005false\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\305\004\n\021" + + "CubeVisitResponse\022\026\n\016compressedRows\030\001 \002(" + + "\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.Sta" + + "ts\022/\n\terrorInfo\030\003 \001(\0132\034.CubeVisitRespons" + + "e.ErrorInfo\032\252\002\n\005Stats\022\030\n\020serviceStartTim" + + "e\030\001 \001(\003\022\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scann" + + "edRowCount\030\003 \001(\003\022\032\n\022aggregatedRowCount\030\004" + + " \001(\003\022\025\n\rsystemCpuLoad\030\005 \001(\001\022\036\n\026freePhysi" + + "calMemorySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize" + + "\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t", + "\022\026\n\016normalComplete\030\n \001(\005\022\024\n\014scannedBytes" + + "\030\013 \001(\003\022\030\n\020filteredRowCount\030\014 \001(\003\032H\n\tErro" + + "rInfo\022*\n\004type\030\001 \002(\0162\034.CubeVisitResponse." + + "ErrorType\022\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022" + + "\020\n\014UNKNOWN_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOUR" + + "CE_LIMIT_EXCEEDED\020\0022F\n\020CubeVisitService\022" + + "2\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeVi" + + "sitResponseB`\nEorg.apache.kylin.storage." + + "hbase.cube.v2.coprocessor.endpoint.gener" + + "atedB\017CubeVisitProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5729,7 +5855,7 @@ public final class CubeVisitProtos { internal_static_CubeVisitRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CubeVisitRequest_descriptor, - new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", "IsExactAggregate", }); + new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", "MaxReturnBytes", "IsExactAggregate", }); internal_static_CubeVisitRequest_IntList_descriptor = internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0); internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto index 40dbc68..1248000 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto @@ -16,6 +16,7 @@ // limitations under the License. // +// use 2.x protobuf rather than 3.x // usage: // protoc --java_out=./storage-hbase/src/main/java ./storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto @@ -38,7 +39,8 @@ message CubeVisitRequest { optional string queryId = 6; optional bool spillEnabled = 7 [default = true]; optional int64 maxScanBytes = 8; // must be positive - optional bool isExactAggregate = 9 [default = false]; + optional int64 maxReturnBytes = 9; // max return bytes + optional bool isExactAggregate = 10 [default = false]; message IntList { repeated int32 ints = 1; }