[6/8] kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout

2016-04-12 Thread mahongbin
KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/11be1e38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/11be1e38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/11be1e38

Branch: refs/heads/master
Commit: 11be1e3826cdea8db8df8975ebdff5cf1d93444f
Parents: b26b248
Author: Hongbin Ma 
Authored: Tue Apr 12 09:47:52 2016 +0800
Committer: Hongbin Ma 
Committed: Wed Apr 13 11:11:15 2016 +0800

--
 .../apache/kylin/gridtable/GTScanRequest.java   |   2 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java |  16 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  58 ++-
 .../endpoint/generated/CubeVisitProtos.java | 436 +--
 .../endpoint/protobuf/CubeVisit.proto   |   3 +
 5 files changed, 463 insertions(+), 52 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
--
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index c4abb57..5681057 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -187,6 +188,7 @@ public class GTScanRequest {
 }
 }
 System.out.println("Meaningless byte is " + meaninglessByte);
+IOUtils.closeQuietly(scanner);
 return scanned;
 }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
--
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1d3da28..38041b3 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -162,6 +162,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 throw new RuntimeException("error when waiting queue", e);
 }
 }
+
+public long getTimeout() {
+return timeout;
+}
 }
 
 static class EndpointResultsAsGTScanner implements IGTScanner {
@@ -313,7 +317,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
 logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString 
{} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
-
 logger.info("The scan {} for segment {} is as below, shard part of 
start/end key is set to 0", 
Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
 for (RawScan rs : rawScans) {
 logScan(rs, cubeSeg.getStorageLocationIdentifier());
@@ -323,7 +326,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
 final AtomicInteger totalScannedCount = new AtomicInteger(0);
 final ExpectedSizeIterator epResultItr = new 
ExpectedSizeIterator(shardNum);
-final String currentThreadName = Thread.currentThread().getName();
 
 for (final Pair epRange : 
getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
 final ByteString finalScanRequestByteString = 
scanRequestByteString;
@@ -338,6 +340,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 }
 
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
 builder.setBehavior(toggle);
+builder.setStartTime(System.currentTimeMillis());
+builder.setTimeout(epResultItr.getTimeout());
 
 Map results;
 try {
@@ -348,7 +352,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
 for (Map.Entry 
result : results.entrySet()) {
 
totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
-logger.info(" " + 

kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout

2016-04-11 Thread mahongbin
Repository: kylin
Updated Branches:
  refs/heads/yang-m1 3eeabeb51 -> bb1b6c3f9


 KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bb1b6c3f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bb1b6c3f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bb1b6c3f

Branch: refs/heads/yang-m1
Commit: bb1b6c3f97e36e1b473c0ba33c6544c1debcacee
Parents: 3eeabeb
Author: Hongbin Ma 
Authored: Tue Apr 12 10:32:32 2016 +0800
Committer: Hongbin Ma 
Committed: Tue Apr 12 10:32:32 2016 +0800

--
 .../hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kylin/blob/bb1b6c3f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
--
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 0879661..9dfc607 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -250,10 +250,10 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 byte[] compressedAllRows;
 if (normalComplete.booleanValue()) {
 allRows = outputStream.toByteArray();
-compressedAllRows = CompressionUtils.compress(allRows);
 } else {
-compressedAllRows = new byte[0];
+allRows = new byte[0];
 }
+compressedAllRows = CompressionUtils.compress(allRows);
 
 
 appendProfileInfo(sb, "compress done");



kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout

2016-04-11 Thread mahongbin
Repository: kylin
Updated Branches:
  refs/heads/yang-m1 9513b8e47 -> 3eeabeb51


 KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3eeabeb5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3eeabeb5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3eeabeb5

Branch: refs/heads/yang-m1
Commit: 3eeabeb519d39fbfd5805b41faaf3d94013f1c29
Parents: 9513b8e
Author: Hongbin Ma 
Authored: Tue Apr 12 10:30:39 2016 +0800
Committer: Hongbin Ma 
Committed: Tue Apr 12 10:30:39 2016 +0800

--
 .../cube/v2/coprocessor/endpoint/CubeVisitService.java| 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kylin/blob/3eeabeb5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
--
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 596814f..0879661 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -247,8 +247,14 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 appendProfileInfo(sb, "agg done");
 
 //outputStream.close() is not necessary
-allRows = outputStream.toByteArray();
-byte[] compressedAllRows = CompressionUtils.compress(allRows);
+byte[] compressedAllRows;
+if (normalComplete.booleanValue()) {
+allRows = outputStream.toByteArray();
+compressedAllRows = CompressionUtils.compress(allRows);
+} else {
+compressedAllRows = new byte[0];
+}
+
 
 appendProfileInfo(sb, "compress done");