[6/8] kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout
KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/11be1e38 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/11be1e38 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/11be1e38 Branch: refs/heads/master Commit: 11be1e3826cdea8db8df8975ebdff5cf1d93444f Parents: b26b248 Author: Hongbin MaAuthored: Tue Apr 12 09:47:52 2016 +0800 Committer: Hongbin Ma Committed: Wed Apr 13 11:11:15 2016 +0800 -- .../apache/kylin/gridtable/GTScanRequest.java | 2 + .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 16 +- .../coprocessor/endpoint/CubeVisitService.java | 58 ++- .../endpoint/generated/CubeVisitProtos.java | 436 +-- .../endpoint/protobuf/CubeVisit.proto | 3 + 5 files changed, 463 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index c4abb57..5681057 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import org.apache.commons.io.IOUtils; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; @@ -187,6 +188,7 @@ public class GTScanRequest { } } System.out.println("Meaningless byte is " + meaninglessByte); +IOUtils.closeQuietly(scanner); return scanned; } http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 1d3da28..38041b3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -162,6 +162,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { throw new RuntimeException("error when waiting queue", e); } } + +public long getTimeout() { +return timeout; +} } static class EndpointResultsAsGTScanner implements IGTScanner { @@ -313,7 +317,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); - logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg); for (RawScan rs : rawScans) { logScan(rs, cubeSeg.getStorageLocationIdentifier()); @@ -323,7 +326,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final AtomicInteger totalScannedCount = new AtomicInteger(0); final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); -final String currentThreadName = Thread.currentThread().getName(); for (final Pair epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { final ByteString finalScanRequestByteString = scanRequestByteString; @@ -338,6 +340,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); builder.setBehavior(toggle); +builder.setStartTime(System.currentTimeMillis()); +builder.setTimeout(epResultItr.getTimeout()); Map results; try { @@ -348,7 +352,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { for (Map.Entry result : results.entrySet()) { totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount()); -logger.info(" " +
kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout
Repository: kylin Updated Branches: refs/heads/yang-m1 3eeabeb51 -> bb1b6c3f9 KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bb1b6c3f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bb1b6c3f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bb1b6c3f Branch: refs/heads/yang-m1 Commit: bb1b6c3f97e36e1b473c0ba33c6544c1debcacee Parents: 3eeabeb Author: Hongbin MaAuthored: Tue Apr 12 10:32:32 2016 +0800 Committer: Hongbin Ma Committed: Tue Apr 12 10:32:32 2016 +0800 -- .../hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/bb1b6c3f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 0879661..9dfc607 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -250,10 +250,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement byte[] compressedAllRows; if (normalComplete.booleanValue()) { allRows = outputStream.toByteArray(); -compressedAllRows = CompressionUtils.compress(allRows); } else { -compressedAllRows = new byte[0]; +allRows = new byte[0]; } +compressedAllRows = CompressionUtils.compress(allRows); appendProfileInfo(sb, "compress done");
kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout
Repository: kylin Updated Branches: refs/heads/yang-m1 9513b8e47 -> 3eeabeb51 KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3eeabeb5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3eeabeb5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3eeabeb5 Branch: refs/heads/yang-m1 Commit: 3eeabeb519d39fbfd5805b41faaf3d94013f1c29 Parents: 9513b8e Author: Hongbin MaAuthored: Tue Apr 12 10:30:39 2016 +0800 Committer: Hongbin Ma Committed: Tue Apr 12 10:30:39 2016 +0800 -- .../cube/v2/coprocessor/endpoint/CubeVisitService.java| 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/3eeabeb5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 596814f..0879661 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -247,8 +247,14 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement appendProfileInfo(sb, "agg done"); //outputStream.close() is not necessary -allRows = outputStream.toByteArray(); -byte[] compressedAllRows = CompressionUtils.compress(allRows); +byte[] compressedAllRows; +if (normalComplete.booleanValue()) { +allRows = outputStream.toByteArray(); +compressedAllRows = CompressionUtils.compress(allRows); +} else { +compressedAllRows = new byte[0]; +} + appendProfileInfo(sb, "compress done");