This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 8ba2aa76eda7dc7f60f17ffe85cfed84e96c6411
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Fri Jun 19 15:51:18 2020 +0800

    KYLIN-4586 Add a configuration of maximum partition returned bytes
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   6 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        |   1 +
 .../v2/coprocessor/endpoint/CubeVisitService.java  |   6 +
 .../endpoint/generated/CubeVisitProtos.java        | 212 ++++++++++++++++-----
 .../coprocessor/endpoint/protobuf/CubeVisit.proto  |   4 +-
 5 files changed, 185 insertions(+), 44 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 416800b..2ac703d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1416,6 +1416,12 @@ public abstract class KylinConfigBase implements 
Serializable {
         return value > 0 ? value : Long.MAX_VALUE;
     }
 
+    public long getPartitionMaxReturnBytes() {
+        long value = Long.parseLong(
+                this.getOptional("kylin.storage.partition.max-return-bytes", 
String.valueOf(3L * 1024 * 1024 * 1024)));
+        return value > 0 ? value : Long.MAX_VALUE;
+    }
+    
     public int getQueryCoprocessorTimeoutSeconds() {
         return 
Integer.parseInt(this.getOptional("kylin.storage.hbase.coprocessor-timeout-seconds",
 "0"));
     }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 175bc37..c694c0d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -284,6 +284,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         }
         
builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled());
         
builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes());
+        
builder.setMaxReturnBytes(cubeSeg.getConfig().getPartitionMaxReturnBytes());
         builder.setIsExactAggregate(storageContext.isExactAggregation());
 
         final String logHeader = String.format(Locale.ROOT, "<sub-thread for 
Query %s GTScanRequest %s>",
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index e39e352..450f6a9 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -334,7 +334,13 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             long finalRowCount = 0L;
 
             try {
+                long maxReturnBytes = request.hasMaxReturnBytes() ? 
request.getMaxReturnBytes() : Long.MAX_VALUE;
                 for (GTRecord oneRecord : finalScanner) {
+                    int outputSize = outputStream.size();
+                    if (outputSize > maxReturnBytes) {
+                        throw new ResourceLimitExceededException("return row 
size exceeds threshold " + outputSize);
+                    }
+
                     buffer.clear();
                     try {
                         oneRecord.exportColumns(scanReq.getColumns(), buffer);
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 2da6020..1b96a75 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -136,13 +136,31 @@ public final class CubeVisitProtos {
      */
     long getMaxScanBytes();
 
-    // optional bool isExactAggregate = 9 [default = false];
+    // optional int64 maxReturnBytes = 9;
     /**
-     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     * <code>optional int64 maxReturnBytes = 9;</code>
+     *
+     * <pre>
+     * max return bytes
+     * </pre>
+     */
+    boolean hasMaxReturnBytes();
+    /**
+     * <code>optional int64 maxReturnBytes = 9;</code>
+     *
+     * <pre>
+     * max return bytes
+     * </pre>
+     */
+    long getMaxReturnBytes();
+
+    // optional bool isExactAggregate = 10 [default = false];
+    /**
+     * <code>optional bool isExactAggregate = 10 [default = false];</code>
      */
     boolean hasIsExactAggregate();
     /**
-     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     * <code>optional bool isExactAggregate = 10 [default = false];</code>
      */
     boolean getIsExactAggregate();
   }
@@ -242,6 +260,11 @@ public final class CubeVisitProtos {
             }
             case 72: {
               bitField0_ |= 0x00000080;
+              maxReturnBytes_ = input.readInt64();
+              break;
+            }
+            case 80: {
+              bitField0_ |= 0x00000100;
               isExactAggregate_ = input.readBool();
               break;
             }
@@ -1012,17 +1035,41 @@ public final class CubeVisitProtos {
       return maxScanBytes_;
     }
 
-    // optional bool isExactAggregate = 9 [default = false];
-    public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 9;
+    // optional int64 maxReturnBytes = 9;
+    public static final int MAXRETURNBYTES_FIELD_NUMBER = 9;
+    private long maxReturnBytes_;
+    /**
+     * <code>optional int64 maxReturnBytes = 9;</code>
+     *
+     * <pre>
+     * max return bytes
+     * </pre>
+     */
+    public boolean hasMaxReturnBytes() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional int64 maxReturnBytes = 9;</code>
+     *
+     * <pre>
+     * max return bytes
+     * </pre>
+     */
+    public long getMaxReturnBytes() {
+      return maxReturnBytes_;
+    }
+
+    // optional bool isExactAggregate = 10 [default = false];
+    public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 10;
     private boolean isExactAggregate_;
     /**
-     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     * <code>optional bool isExactAggregate = 10 [default = false];</code>
      */
     public boolean hasIsExactAggregate() {
-      return ((bitField0_ & 0x00000080) == 0x00000080);
+      return ((bitField0_ & 0x00000100) == 0x00000100);
     }
     /**
-     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     * <code>optional bool isExactAggregate = 10 [default = false];</code>
      */
     public boolean getIsExactAggregate() {
       return isExactAggregate_;
@@ -1037,6 +1084,7 @@ public final class CubeVisitProtos {
       queryId_ = "";
       spillEnabled_ = true;
       maxScanBytes_ = 0L;
+      maxReturnBytes_ = 0L;
       isExactAggregate_ = false;
     }
     private byte memoizedIsInitialized = -1;
@@ -1092,7 +1140,10 @@ public final class CubeVisitProtos {
         output.writeInt64(8, maxScanBytes_);
       }
       if (((bitField0_ & 0x00000080) == 0x00000080)) {
-        output.writeBool(9, isExactAggregate_);
+        output.writeInt64(9, maxReturnBytes_);
+      }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeBool(10, isExactAggregate_);
       }
       getUnknownFields().writeTo(output);
     }
@@ -1137,7 +1188,11 @@ public final class CubeVisitProtos {
       }
       if (((bitField0_ & 0x00000080) == 0x00000080)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeBoolSize(9, isExactAggregate_);
+          .computeInt64Size(9, maxReturnBytes_);
+      }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(10, isExactAggregate_);
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
@@ -1199,6 +1254,11 @@ public final class CubeVisitProtos {
         result = result && (getMaxScanBytes()
             == other.getMaxScanBytes());
       }
+      result = result && (hasMaxReturnBytes() == other.hasMaxReturnBytes());
+      if (hasMaxReturnBytes()) {
+        result = result && (getMaxReturnBytes()
+            == other.getMaxReturnBytes());
+      }
       result = result && (hasIsExactAggregate() == 
other.hasIsExactAggregate());
       if (hasIsExactAggregate()) {
         result = result && (getIsExactAggregate()
@@ -1249,6 +1309,10 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getMaxScanBytes());
       }
+      if (hasMaxReturnBytes()) {
+        hash = (37 * hash) + MAXRETURNBYTES_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getMaxReturnBytes());
+      }
       if (hasIsExactAggregate()) {
         hash = (37 * hash) + ISEXACTAGGREGATE_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getIsExactAggregate());
@@ -1383,8 +1447,10 @@ public final class CubeVisitProtos {
         bitField0_ = (bitField0_ & ~0x00000040);
         maxScanBytes_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000080);
-        isExactAggregate_ = false;
+        maxReturnBytes_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000100);
+        isExactAggregate_ = false;
+        bitField0_ = (bitField0_ & ~0x00000200);
         return this;
       }
 
@@ -1453,6 +1519,10 @@ public final class CubeVisitProtos {
         if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
           to_bitField0_ |= 0x00000080;
         }
+        result.maxReturnBytes_ = maxReturnBytes_;
+        if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+          to_bitField0_ |= 0x00000100;
+        }
         result.isExactAggregate_ = isExactAggregate_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
@@ -1521,6 +1591,9 @@ public final class CubeVisitProtos {
         if (other.hasMaxScanBytes()) {
           setMaxScanBytes(other.getMaxScanBytes());
         }
+        if (other.hasMaxReturnBytes()) {
+          setMaxReturnBytes(other.getMaxReturnBytes());
+        }
         if (other.hasIsExactAggregate()) {
           setIsExactAggregate(other.getIsExactAggregate());
         }
@@ -2166,34 +2239,83 @@ public final class CubeVisitProtos {
         return this;
       }
 
-      // optional bool isExactAggregate = 9 [default = false];
+      // optional int64 maxReturnBytes = 9;
+      private long maxReturnBytes_ ;
+      /**
+       * <code>optional int64 maxReturnBytes = 9;</code>
+       *
+       * <pre>
+       * max return bytes
+       * </pre>
+       */
+      public boolean hasMaxReturnBytes() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional int64 maxReturnBytes = 9;</code>
+       *
+       * <pre>
+       * max return bytes
+       * </pre>
+       */
+      public long getMaxReturnBytes() {
+        return maxReturnBytes_;
+      }
+      /**
+       * <code>optional int64 maxReturnBytes = 9;</code>
+       *
+       * <pre>
+       * max return bytes
+       * </pre>
+       */
+      public Builder setMaxReturnBytes(long value) {
+        bitField0_ |= 0x00000100;
+        maxReturnBytes_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 maxReturnBytes = 9;</code>
+       *
+       * <pre>
+       * max return bytes
+       * </pre>
+       */
+      public Builder clearMaxReturnBytes() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        maxReturnBytes_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bool isExactAggregate = 10 [default = false];
       private boolean isExactAggregate_ ;
       /**
-       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       * <code>optional bool isExactAggregate = 10 [default = false];</code>
        */
       public boolean hasIsExactAggregate() {
-        return ((bitField0_ & 0x00000100) == 0x00000100);
+        return ((bitField0_ & 0x00000200) == 0x00000200);
       }
       /**
-       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       * <code>optional bool isExactAggregate = 10 [default = false];</code>
        */
       public boolean getIsExactAggregate() {
         return isExactAggregate_;
       }
       /**
-       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       * <code>optional bool isExactAggregate = 10 [default = false];</code>
        */
       public Builder setIsExactAggregate(boolean value) {
-        bitField0_ |= 0x00000100;
+        bitField0_ |= 0x00000200;
         isExactAggregate_ = value;
         onChanged();
         return this;
       }
       /**
-       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       * <code>optional bool isExactAggregate = 10 [default = false];</code>
        */
       public Builder clearIsExactAggregate() {
-        bitField0_ = (bitField0_ & ~0x00000100);
+        bitField0_ = (bitField0_ & ~0x00000200);
         isExactAggregate_ = false;
         onChanged();
         return this;
@@ -3368,6 +3490,8 @@ public final class CubeVisitProtos {
           maybeForceBuilderInitialization();
         }
         private void maybeForceBuilderInitialization() {
+          if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          }
         }
         private static Builder create() {
           return new Builder();
@@ -4442,6 +4566,8 @@ public final class CubeVisitProtos {
           maybeForceBuilderInitialization();
         }
         private void maybeForceBuilderInitialization() {
+          if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          }
         }
         private static Builder create() {
           return new Builder();
@@ -5690,34 +5816,34 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\246\002\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\276\002\n\020C" +
       "ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" +
       "\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" +
       "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 
\003(\0132\031.Cub" +
       "eVisitRequest.IntList\022\027\n\017kylinProperties" +
       "\030\005 \002(\t\022\017\n\007queryId\030\006 
\001(\t\022\032\n\014spillEnabled\030" +
-      "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 
\001(\003\022\037\n\020isE" +
-      "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n",
-      "\004ints\030\001 
\003(\005\"\305\004\n\021CubeVisitResponse\022\026\n\016com" +
-      "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 
\002(\0132\030.CubeV" +
-      "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." +
-      "CubeVisitResponse.ErrorInfo\032\252\002\n\005Stats\022\030\n" +
-      "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
-      "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 
\001(\003\022\032\n\022aggr" +
-      "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " +
-      "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 
\001(\001\022\031\n\021f" +
-      "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 
\001(\t" +
-      "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n 
\001(\005",
-      "\022\024\n\014scannedBytes\030\013 
\001(\003\022\030\n\020filteredRowCou" +
-      "nt\030\014 \001(\003\032H\n\tErrorInfo\022*\n\004type\030\001 
\002(\0162\034.Cu" +
-      "beVisitResponse.ErrorType\022\017\n\007message\030\002 \002" +
-      "(\t\"G\n\tErrorType\022\020\n\014UNKNOWN_TYPE\020\000\022\013\n\007TIM" 
+
-      "EOUT\020\001\022\033\n\027RESOURCE_LIMIT_EXCEEDED\020\0022F\n\020C" +
-      "ubeVisitService\0222\n\tvisitCube\022\021.CubeVisit" +
-      "Request\032\022.CubeVisitResponseB`\nEorg.apach" +
-      "e.kylin.storage.hbase.cube.v2.coprocesso" +
-      "r.endpoint.generatedB\017CubeVisitProtosH\001\210" +
-      "\001\001\240\001\001"
+      "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 
\001(\003\022\026\n\016max" +
+      "ReturnBytes\030\t \001(\003\022\037\n\020isExactAggregate\030\n ",
+      "\001(\010:\005false\032\027\n\007IntList\022\014\n\004ints\030\001 
\003(\005\"\305\004\n\021" +
+      "CubeVisitResponse\022\026\n\016compressedRows\030\001 \002(" +
+      "\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.Sta" +
+      "ts\022/\n\terrorInfo\030\003 \001(\0132\034.CubeVisitRespons" +
+      "e.ErrorInfo\032\252\002\n\005Stats\022\030\n\020serviceStartTim" +
+      "e\030\001 \001(\003\022\026\n\016serviceEndTime\030\002 
\001(\003\022\027\n\017scann" +
+      "edRowCount\030\003 \001(\003\022\032\n\022aggregatedRowCount\030\004" +
+      " \001(\003\022\025\n\rsystemCpuLoad\030\005 
\001(\001\022\036\n\026freePhysi" +
+      "calMemorySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize" +
+      "\030\007 \001(\001\022\020\n\010hostname\030\010 
\001(\t\022\016\n\006etcMsg\030\t \001(\t",
+      "\022\026\n\016normalComplete\030\n \001(\005\022\024\n\014scannedBytes" 
+
+      "\030\013 \001(\003\022\030\n\020filteredRowCount\030\014 
\001(\003\032H\n\tErro" +
+      "rInfo\022*\n\004type\030\001 \002(\0162\034.CubeVisitResponse." +
+      "ErrorType\022\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022" +
+      
"\020\n\014UNKNOWN_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOUR"
 +
+      "CE_LIMIT_EXCEEDED\020\0022F\n\020CubeVisitService\022" +
+      "2\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeVi" +
+      "sitResponseB`\nEorg.apache.kylin.storage." +
+      "hbase.cube.v2.coprocessor.endpoint.gener" +
+      "atedB\017CubeVisitProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5729,7 +5855,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", 
"RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", 
"SpillEnabled", "MaxScanBytes", "IsExactAggregate", });
+              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", 
"RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", 
"SpillEnabled", "MaxScanBytes", "MaxReturnBytes", "IsExactAggregate", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             
internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index 40dbc68..1248000 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -16,6 +16,7 @@
 // limitations under the License.
 //
 
+// use 2.x protobuf rather than 3.x
 // usage:
 // protoc  --java_out=./storage-hbase/src/main/java  
./storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
@@ -38,7 +39,8 @@ message CubeVisitRequest {
     optional string queryId = 6;
     optional bool spillEnabled = 7 [default = true];
     optional int64 maxScanBytes = 8; // must be positive
-    optional bool isExactAggregate = 9 [default = false];
+    optional int64 maxReturnBytes = 9; // max return bytes
+    optional bool isExactAggregate = 10 [default = false];
     message IntList {
         repeated int32 ints = 1;
     }

Reply via email to