Repository: kylin Updated Branches: refs/heads/yang-m1 1baa91473 -> 638f7b3c8
refine coprocessor suicide Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/638f7b3c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/638f7b3c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/638f7b3c Branch: refs/heads/yang-m1 Commit: 638f7b3c8b59b9a70a281352fee243dc424daa12 Parents: 1baa914 Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Apr 21 16:05:31 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Apr 21 16:05:31 2016 +0800 ---------------------------------------------------------------------- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 ++ .../coprocessor/endpoint/CubeVisitService.java | 35 ++++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/638f7b3c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index cf39641..053d99e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -122,6 +122,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); + + this.timeout *= 1.1;//allow for some delay logger.info("Timeout for ExpectedSizeIterator is " + this.timeout); } http://git-wip-us.apache.org/repos/asf/kylin/blob/638f7b3c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index f6476f8..6058768 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -106,13 +106,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @Override public boolean hasNext() { - if (counter++ % 1000 == 1) { + if (counter % 1000 == 1) { if (System.currentTimeMillis() - startTime > timeout) { normalComplete.setValue(false); + logger.error("scanner aborted because timeout"); return false; } } + if (counter % 100000 == 1) { + logger.info("Scanned " + counter + " rows."); + } + + counter++; + return !nextOne.isEmpty(); } @@ -218,10 +225,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement appendProfileInfo(sb, "scanned " + counter); } - final MutableBoolean normalComplete = new MutableBoolean(true); - final long startTime = this.serviceStartTime;//request.getStartTime(); - final long timeout = (long) (request.getTimeout() * 0.95); - InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner, normalComplete, startTime, timeout); + final MutableBoolean scanNormalComplete = new MutableBoolean(true); + final long startTime = this.serviceStartTime; + final long timeout = request.getTimeout(); + InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner, scanNormalComplete, startTime, timeout); if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { scanReq.setAggrCacheGB(0); // disable mem check if so told @@ -239,6 +246,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow int finalRowCount = 0; for (GTRecord oneRecord : finalScanner) { + + if (!scanNormalComplete.booleanValue()) { + logger.error("aggregate iterator aborted because input iterator aborts"); + break; + } + + if (finalRowCount % 1000 == 1) { + if (System.currentTimeMillis() - startTime > timeout) { + logger.error("aggregate iterator aborted because timeout"); + break; + } + } + + buffer.clear(); oneRecord.exportColumns(scanReq.getColumns(), buffer); buffer.flip(); @@ -251,7 +272,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement //outputStream.close() is not necessary byte[] compressedAllRows; - if (normalComplete.booleanValue()) { + if (scanNormalComplete.booleanValue()) { allRows = outputStream.toByteArray(); } else { allRows = new byte[0]; @@ -282,7 +303,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement setFreeSwapSpaceSize(freeSwapSpaceSize).// setHostname(InetAddress.getLocalHost().getHostName()).// setEtcMsg(sb.toString()).// - setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build()) + setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build()) .// build());