KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/11be1e38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/11be1e38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/11be1e38

Branch: refs/heads/master
Commit: 11be1e3826cdea8db8df8975ebdff5cf1d93444f
Parents: b26b248
Author: Hongbin Ma <mahong...@apache.org>
Authored: Tue Apr 12 09:47:52 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/gridtable/GTScanRequest.java   |   2 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  16 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  58 ++-
 .../endpoint/generated/CubeVisitProtos.java     | 436 +++++++++++++++++--
 .../endpoint/protobuf/CubeVisit.proto           |   3 +
 5 files changed, 463 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index c4abb57..5681057 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -187,6 +188,7 @@ public class GTScanRequest {
             }
         }
         System.out.println("Meaningless byte is " + meaninglessByte);
+        IOUtils.closeQuietly(scanner);
         return scanned;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1d3da28..38041b3 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -162,6 +162,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 throw new RuntimeException("error when waiting queue", e);
             }
         }
+
+        public long getTimeout() {
+            return timeout;
+        }
     }
 
     static class EndpointResultsAsGTScanner implements IGTScanner {
@@ -313,7 +317,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString 
{} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
-
         logger.info("The scan {} for segment {} is as below, shard part of 
start/end key is set to 0", 
Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
         for (RawScan rs : rawScans) {
             logScan(rs, cubeSeg.getStorageLocationIdentifier());
@@ -323,7 +326,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         final AtomicInteger totalScannedCount = new AtomicInteger(0);
         final ExpectedSizeIterator epResultItr = new 
ExpectedSizeIterator(shardNum);
-        final String currentThreadName = Thread.currentThread().getName();
 
         for (final Pair<byte[], byte[]> epRange : 
getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             final ByteString finalScanRequestByteString = 
scanRequestByteString;
@@ -338,6 +340,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     }
                     
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
                     builder.setBehavior(toggle);
+                    builder.setStartTime(System.currentTimeMillis());
+                    builder.setTimeout(epResultItr.getTimeout());
 
                     Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
                     try {
@@ -348,7 +352,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
                     for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> 
result : results.entrySet()) {
                         
totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
-                        logger.info("<sub-thread for GTScanRequest " +  
Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + 
getStatsString(result));
+                        logger.info("<sub-thread for GTScanRequest " + 
Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + 
getStatsString(result));
+
+                        if (result.getValue().getStats().getNormalComplete() 
!= 1) {
+                            throw new RuntimeException("The coprocessor thread 
stopped itself due to scan timeout.");
+                        }
+
                         try {
                             
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
                         } catch (IOException | DataFormatException e) {
@@ -371,6 +380,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - 
stats.getServiceStartTime()).append("(ms). ");
         sb.append("Server CPU usage: 
").append(stats.getSystemCpuLoad()).append(", server physical mem left: 
").append(stats.getFreePhysicalMemorySize()).append(", server swap mem 
left:").append(stats.getFreeSwapSpaceSize()).append(".");
         sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
+        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 
1).append(".");
         return sb.toString();
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 9e8e251..5158b33 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -152,17 +153,17 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
     }
 
     private void appendProfileInfo(StringBuilder sb, String info) {
-        sb.append(System.currentTimeMillis() - this.serviceStartTime);
         if (info != null) {
-            sb.append(":").append(info);
+            sb.append(info);
         }
+        sb.append("@" + (System.currentTimeMillis() - this.serviceStartTime));
         sb.append(",");
     }
 
     @Override
     public void visitCube(RpcController controller, 
CubeVisitProtos.CubeVisitRequest request, 
RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
 
-        RegionScanner innerScanner = null;
+        List<RegionScanner> regionScanners = Lists.newArrayList();
         HRegion region = null;
 
         StringBuilder sb = new StringBuilder();
@@ -182,6 +183,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             CoprocessorBehavior behavior = 
CoprocessorBehavior.valueOf(request.getBehavior());
             final List<RawScan> hbaseRawScans = 
deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
 
+            appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - 
request.getStartTime()));
+
             MassInTupleFilter.VALUE_PROVIDER_FACTORY = new 
MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() 
{
                 @Override
                 public DimensionEncoding getDimEnc(TblColRef col) {
@@ -190,6 +193,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             });
 
             final List<InnerScannerAsIterator> cellListsForeachRawScan = 
Lists.newArrayList();
+
             for (RawScan hbaseRawScan : hbaseRawScans) {
                 if (request.getRowkeyPreambleSize() - 
RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
                     //if has shard, fill region shard to raw scan start/end
@@ -197,20 +201,23 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 }
 
                 Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
-                innerScanner = region.getScanner(scan);
+                RegionScanner innerScanner = region.getScanner(scan);
+                regionScanners.add(innerScanner);
 
                 InnerScannerAsIterator cellListIterator = new 
InnerScannerAsIterator(innerScanner);
                 cellListsForeachRawScan.add(cellListIterator);
             }
-            
+
             final Iterator<List<Cell>> allCellLists = 
Iterators.concat(cellListsForeachRawScan.iterator());
 
             if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
                 //this is only for CoprocessorBehavior.RAW_SCAN case to 
profile hbase scan speed
                 List<Cell> temp = Lists.newArrayList();
                 int counter = 0;
-                while (innerScanner.nextRaw(temp)) {
-                    counter++;
+                for (RegionScanner innerScanner : regionScanners) {
+                    while (innerScanner.nextRaw(temp)) {
+                        counter++;
+                    }
                 }
                 appendProfileInfo(sb, "scanned " + counter);
             }
@@ -219,7 +226,14 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
 
-            IGTStore store = new HBaseReadonlyStore(new CellListIterator() {
+            final MutableBoolean normalComplete = new MutableBoolean(true);
+            final long startTime = request.getStartTime();
+            final long timeout = (long) (request.getTimeout() * 0.95);
+
+            final CellListIterator cellListIterator = new CellListIterator() {
+
+                int counter = 0;
+
                 @Override
                 public void close() throws IOException {
                     for (CellListIterator closeable : cellListsForeachRawScan) 
{
@@ -229,6 +243,12 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
                 @Override
                 public boolean hasNext() {
+                    if (counter++ % 1000 == 1) {
+                        if (System.currentTimeMillis() - startTime > timeout) {
+                            normalComplete.setValue(false);
+                            return false;
+                        }
+                    }
                     return allCellLists.hasNext();
                 }
 
@@ -241,7 +261,9 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 public void remove() {
                     throw new UnsupportedOperationException();
                 }
-            }, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
request.getRowkeyPreambleSize());
+            };
+
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
request.getRowkeyPreambleSize());
 
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
@@ -260,13 +282,19 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 outputStream.write(buffer.array(), buffer.arrayOffset() - 
buffer.position(), buffer.remaining());
                 finalRowCount++;
             }
+            finalScanner.close();
 
             appendProfileInfo(sb, "agg done");
 
             //outputStream.close() is not necessary
-            allRows = outputStream.toByteArray();
-            byte[] compressedAllRows = CompressionUtils.compress(allRows);
-
+            byte[] compressedAllRows;
+            if (normalComplete.booleanValue()) {
+                allRows = outputStream.toByteArray();
+            } else {
+                allRows = new byte[0];
+            }
+            compressedAllRows = CompressionUtils.compress(allRows);
+            
             appendProfileInfo(sb, "compress done");
 
             OperatingSystemMXBean operatingSystemMXBean = 
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
@@ -289,7 +317,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                             setFreeSwapSpaceSize(freeSwapSpaceSize).//
                             
setHostname(InetAddress.getLocalHost().getHostName()).// 
                             setEtcMsg(sb.toString()).//
-                            build())
+                            setNormalComplete(normalComplete.booleanValue() ? 
1 : 0).build())
                     .//
                     build());
 
@@ -297,7 +325,9 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             logger.error(ioe.toString());
             ResponseConverter.setControllerException(controller, ioe);
         } finally {
-            IOUtils.closeQuietly(innerScanner);
+            for (RegionScanner innerScanner : regionScanners) {
+                IOUtils.closeQuietly(innerScanner);
+            }
             if (region != null) {
                 try {
                     region.closeRegionOperation();

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 6e3e2bb..53393e8 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // source: 
storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
@@ -98,6 +80,42 @@ public final class CubeVisitProtos {
      */
     
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder
 getHbaseColumnsToGTOrBuilder(
         int index);
+
+    // required int64 startTime = 6;
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    boolean hasStartTime();
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    long getStartTime();
+
+    // required int64 timeout = 7;
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    boolean hasTimeout();
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    long getTimeout();
   }
   /**
    * Protobuf type {@code CubeVisitRequest}
@@ -178,6 +196,16 @@ public final class CubeVisitProtos {
               
hbaseColumnsToGT_.add(input.readMessage(org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.PARSER,
 extensionRegistry));
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000010;
+              startTime_ = input.readInt64();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000020;
+              timeout_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -852,12 +880,62 @@ public final class CubeVisitProtos {
       return hbaseColumnsToGT_.get(index);
     }
 
+    // required int64 startTime = 6;
+    public static final int STARTTIME_FIELD_NUMBER = 6;
+    private long startTime_;
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    public boolean hasStartTime() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    public long getStartTime() {
+      return startTime_;
+    }
+
+    // required int64 timeout = 7;
+    public static final int TIMEOUT_FIELD_NUMBER = 7;
+    private long timeout_;
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    public boolean hasTimeout() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    public long getTimeout() {
+      return timeout_;
+    }
+
     private void initFields() {
       behavior_ = "";
       gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
       hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
       rowkeyPreambleSize_ = 0;
       hbaseColumnsToGT_ = java.util.Collections.emptyList();
+      startTime_ = 0L;
+      timeout_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -880,6 +958,14 @@ public final class CubeVisitProtos {
         memoizedIsInitialized = 0;
         return false;
       }
+      if (!hasStartTime()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasTimeout()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -902,6 +988,12 @@ public final class CubeVisitProtos {
       for (int i = 0; i < hbaseColumnsToGT_.size(); i++) {
         output.writeMessage(5, hbaseColumnsToGT_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(6, startTime_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(7, timeout_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -931,6 +1023,14 @@ public final class CubeVisitProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, hbaseColumnsToGT_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(6, startTime_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(7, timeout_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -976,6 +1076,16 @@ public final class CubeVisitProtos {
       }
       result = result && getHbaseColumnsToGTList()
           .equals(other.getHbaseColumnsToGTList());
+      result = result && (hasStartTime() == other.hasStartTime());
+      if (hasStartTime()) {
+        result = result && (getStartTime()
+            == other.getStartTime());
+      }
+      result = result && (hasTimeout() == other.hasTimeout());
+      if (hasTimeout()) {
+        result = result && (getTimeout()
+            == other.getTimeout());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1009,6 +1119,14 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + HBASECOLUMNSTOGT_FIELD_NUMBER;
         hash = (53 * hash) + getHbaseColumnsToGTList().hashCode();
       }
+      if (hasStartTime()) {
+        hash = (37 * hash) + STARTTIME_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getStartTime());
+      }
+      if (hasTimeout()) {
+        hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getTimeout());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1133,6 +1251,10 @@ public final class CubeVisitProtos {
         } else {
           hbaseColumnsToGTBuilder_.clear();
         }
+        startTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        timeout_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -1186,6 +1308,14 @@ public final class CubeVisitProtos {
         } else {
           result.hbaseColumnsToGT_ = hbaseColumnsToGTBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.startTime_ = startTime_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.timeout_ = timeout_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1242,6 +1372,12 @@ public final class CubeVisitProtos {
             }
           }
         }
+        if (other.hasStartTime()) {
+          setStartTime(other.getStartTime());
+        }
+        if (other.hasTimeout()) {
+          setTimeout(other.getTimeout());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1263,6 +1399,14 @@ public final class CubeVisitProtos {
           
           return false;
         }
+        if (!hasStartTime()) {
+          
+          return false;
+        }
+        if (!hasTimeout()) {
+          
+          return false;
+        }
         return true;
       }
 
@@ -1704,6 +1848,104 @@ public final class CubeVisitProtos {
         return hbaseColumnsToGTBuilder_;
       }
 
+      // required int64 startTime = 6;
+      private long startTime_ ;
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public boolean hasStartTime() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public long getStartTime() {
+        return startTime_;
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public Builder setStartTime(long value) {
+        bitField0_ |= 0x00000020;
+        startTime_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public Builder clearStartTime() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        startTime_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // required int64 timeout = 7;
+      private long timeout_ ;
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public boolean hasTimeout() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public long getTimeout() {
+        return timeout_;
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public Builder setTimeout(long value) {
+        bitField0_ |= 0x00000040;
+        timeout_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public Builder clearTimeout() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        timeout_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CubeVisitRequest)
     }
 
@@ -1952,6 +2194,24 @@ public final class CubeVisitProtos {
        */
       com.google.protobuf.ByteString
           getEtcMsgBytes();
+
+      // optional int32 normalComplete = 10;
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      boolean hasNormalComplete();
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      int getNormalComplete();
     }
     /**
      * Protobuf type {@code CubeVisitResponse.Stats}
@@ -2049,6 +2309,11 @@ public final class CubeVisitProtos {
                 etcMsg_ = input.readBytes();
                 break;
               }
+              case 80: {
+                bitField0_ |= 0x00000200;
+                normalComplete_ = input.readInt32();
+                break;
+              }
             }
           }
         } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2287,6 +2552,30 @@ public final class CubeVisitProtos {
         }
       }
 
+      // optional int32 normalComplete = 10;
+      public static final int NORMALCOMPLETE_FIELD_NUMBER = 10;
+      private int normalComplete_;
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      public boolean hasNormalComplete() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      public int getNormalComplete() {
+        return normalComplete_;
+      }
+
       private void initFields() {
         serviceStartTime_ = 0L;
         serviceEndTime_ = 0L;
@@ -2297,6 +2586,7 @@ public final class CubeVisitProtos {
         freeSwapSpaceSize_ = 0D;
         hostname_ = "";
         etcMsg_ = "";
+        normalComplete_ = 0;
       }
       private byte memoizedIsInitialized = -1;
       public final boolean isInitialized() {
@@ -2337,6 +2627,9 @@ public final class CubeVisitProtos {
         if (((bitField0_ & 0x00000100) == 0x00000100)) {
           output.writeBytes(9, getEtcMsgBytes());
         }
+        if (((bitField0_ & 0x00000200) == 0x00000200)) {
+          output.writeInt32(10, normalComplete_);
+        }
         getUnknownFields().writeTo(output);
       }
 
@@ -2382,6 +2675,10 @@ public final class CubeVisitProtos {
           size += com.google.protobuf.CodedOutputStream
             .computeBytesSize(9, getEtcMsgBytes());
         }
+        if (((bitField0_ & 0x00000200) == 0x00000200)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeInt32Size(10, normalComplete_);
+        }
         size += getUnknownFields().getSerializedSize();
         memoizedSerializedSize = size;
         return size;
@@ -2447,6 +2744,11 @@ public final class CubeVisitProtos {
           result = result && getEtcMsg()
               .equals(other.getEtcMsg());
         }
+        result = result && (hasNormalComplete() == other.hasNormalComplete());
+        if (hasNormalComplete()) {
+          result = result && (getNormalComplete()
+              == other.getNormalComplete());
+        }
         result = result &&
             getUnknownFields().equals(other.getUnknownFields());
         return result;
@@ -2499,6 +2801,10 @@ public final class CubeVisitProtos {
           hash = (37 * hash) + ETCMSG_FIELD_NUMBER;
           hash = (53 * hash) + getEtcMsg().hashCode();
         }
+        if (hasNormalComplete()) {
+          hash = (37 * hash) + NORMALCOMPLETE_FIELD_NUMBER;
+          hash = (53 * hash) + getNormalComplete();
+        }
         hash = (29 * hash) + getUnknownFields().hashCode();
         memoizedHashCode = hash;
         return hash;
@@ -2626,6 +2932,8 @@ public final class CubeVisitProtos {
           bitField0_ = (bitField0_ & ~0x00000080);
           etcMsg_ = "";
           bitField0_ = (bitField0_ & ~0x00000100);
+          normalComplete_ = 0;
+          bitField0_ = (bitField0_ & ~0x00000200);
           return this;
         }
 
@@ -2690,6 +2998,10 @@ public final class CubeVisitProtos {
             to_bitField0_ |= 0x00000100;
           }
           result.etcMsg_ = etcMsg_;
+          if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+            to_bitField0_ |= 0x00000200;
+          }
+          result.normalComplete_ = normalComplete_;
           result.bitField0_ = to_bitField0_;
           onBuilt();
           return result;
@@ -2737,6 +3049,9 @@ public final class CubeVisitProtos {
             etcMsg_ = other.etcMsg_;
             onChanged();
           }
+          if (other.hasNormalComplete()) {
+            setNormalComplete(other.getNormalComplete());
+          }
           this.mergeUnknownFields(other.getUnknownFields());
           return this;
         }
@@ -3143,6 +3458,55 @@ public final class CubeVisitProtos {
           return this;
         }
 
+        // optional int32 normalComplete = 10;
+        private int normalComplete_ ;
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public boolean hasNormalComplete() {
+          return ((bitField0_ & 0x00000200) == 0x00000200);
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public int getNormalComplete() {
+          return normalComplete_;
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public Builder setNormalComplete(int value) {
+          bitField0_ |= 0x00000200;
+          normalComplete_ = value;
+          onChanged();
+          return this;
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public Builder clearNormalComplete() {
+          bitField0_ = (bitField0_ & ~0x00000200);
+          normalComplete_ = 0;
+          onChanged();
+          return this;
+        }
+
         // @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats)
       }
 
@@ -3936,24 +4300,26 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\273\001\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\337\001\n\020C" +
       "ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" +
       "canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 
\002(\014\022\032" +
       "\n\022rowkeyPreambleSize\030\004 \002(\005\0223\n\020hbaseColum" +
-      "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\032" +
-      "\027\n\007IntList\022\014\n\004ints\030\001 
\003(\005\"\271\002\n\021CubeVisitRe" +
-      "sponse\022\026\n\016compressedRows\030\001 
\002(\014\022\'\n\005stats\030" +
-      "\002 \002(\0132\030.CubeVisitResponse.Stats\032\342\001\n\005Stat",
-      "s\022\030\n\020serviceStartTime\030\001 
\001(\003\022\026\n\016serviceEn" +
-      "dTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 
\001(\005\022\032\n\022" +
-      "aggregatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoa" +
-      "d\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 
\001(\001\022" +
-      "\031\n\021freeSwapSpaceSize\030\007 
\001(\001\022\020\n\010hostname\030\010" +
-      " \001(\t\022\016\n\006etcMsg\030\t \001(\t2F\n\020CubeVisitService" +
-      "\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeV" +
-      "isitResponseB`\nEorg.apache.kylin.storage" +
-      ".hbase.cube.v2.coprocessor.endpoint.gene" +
-      "ratedB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+      "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\022" +
+      "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 
\002(\003\032\027\n\007I" +
+      "ntList\022\014\n\004ints\030\001 
\003(\005\"\321\002\n\021CubeVisitRespon" +
+      "se\022\026\n\016compressedRows\030\001 
\002(\014\022\'\n\005stats\030\002 \002(",
+      "\0132\030.CubeVisitResponse.Stats\032\372\001\n\005Stats\022\030\n" +
+      "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
+      "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 
\001(\005\022\032\n\022aggr" +
+      "egatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoad\030\005 " +
+      "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 
\001(\001\022\031\n\021f" +
+      "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 
\001(\t" +
+      "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n 
\001(\005" +
+      "2F\n\020CubeVisitService\0222\n\tvisitCube\022\021.Cube" +
+      "VisitRequest\032\022.CubeVisitResponseB`\nEorg." +
+      "apache.kylin.storage.hbase.cube.v2.copro",
+      "cessor.endpoint.generatedB\017CubeVisitProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3965,7 +4331,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "Behavior", "GtScanRequest", 
"HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", });
+              new java.lang.String[] { "Behavior", "GtScanRequest", 
"HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", 
"Timeout", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             
internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new
@@ -3983,7 +4349,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitResponse_Stats_descriptor,
-              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", 
"ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", 
"FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", });
+              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", 
"ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", 
"FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", 
"NormalComplete", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index 5b66a56..ecaad35 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -35,6 +35,8 @@ message CubeVisitRequest {
     required bytes hbaseRawScan = 3;
     required int32 rowkeyPreambleSize = 4;
     repeated IntList hbaseColumnsToGT = 5;
+    required int64 startTime = 6;//when client start the request
+    required int64 timeout = 7;//how long client will wait
     message IntList {
         repeated int32 ints = 1;
     }
@@ -51,6 +53,7 @@ message CubeVisitResponse {
         optional double freeSwapSpaceSize = 7;
         optional string hostname = 8;
         optional string etcMsg = 9;
+        optional int32 normalComplete =10;//when time outs, normalComplete 
will be false
     }
     required bytes compressedRows = 1;
     required Stats stats = 2;

Reply via email to