APACHE-KYLIN-2723: collect query related metrics
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/594087a3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/594087a3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/594087a3 Branch: refs/heads/yaho-cube-planner Commit: 594087a385783fb4ce78cc478dff7c2130a285d2 Parents: 38d7892 Author: Zhong <nju_y...@apache.org> Authored: Thu Aug 10 14:44:29 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Thu Aug 10 14:44:29 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 28 +- .../org/apache/kylin/common/QueryContext.java | 318 +++++++++++++++++++ .../apache/kylin/metrics/MetricsManager.java | 6 +- .../kylin/metrics/lib/impl/MetricsSystem.java | 2 +- .../query/CubeSegmentRecordEventWrapper.java | 52 +-- .../metrics/query/QueryRecordEventWrapper.java | 44 +-- .../metrics/query/RPCRecordEventWrapper.java | 11 +- .../apache/kylin/storage/StorageContext.java | 20 ++ .../gtrecord/GTCubeStorageQueryBase.java | 18 ++ .../kylin/query/enumerator/OLAPQuery.java | 3 + .../apache/kylin/query/relnode/OLAPContext.java | 2 +- .../kylin/rest/metrics/QueryMetricsFacade.java | 72 ++++- .../apache/kylin/rest/response/SQLResponse.java | 27 ++ .../apache/kylin/rest/service/QueryService.java | 10 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 16 +- 15 files changed, 540 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8d4ee20..61bef17 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1124,17 +1124,39 @@ abstract public class KylinConfigBase implements Serializable { // ============================================================================ // Metrics // ============================================================================ - public boolean isMetricsMonitorEnabled() { + public boolean isKylinMetricsMonitorEnabled() { return Boolean.parseBoolean(getOptional("kylin.core.metrics.monitor-enabled", "false")); } - public String getMetricsActiveReservoirDefaultClass() { + public String getKylinMetricsActiveReservoirDefaultClass() { return getOptional("kylin.core.metrics.active-reservoir-default-class", "org.apache.kylin.metrics.lib.impl.StubReservoir"); } - public String getSystemCubeSinkDefaultClass() { + public String getKylinSystemCubeSinkDefaultClass() { return getOptional("kylin.core.metrics.system-cube-sink-default-class", "org.apache.kylin.metrics.lib.impl.hive.HiveSink"); } + + public String getKylinMetricsSubjectSuffix() { + String suffix = getOptional("kylin.core.metric.subject-suffix", null); + if (suffix != null) { + return suffix; + } + return getDeployEnv(); + } + + public String getKylinMetricsSubjectQuery() { + return getOptional("kylin.core.metrics.subject-query", "METRICS_QUERY") + "_" + getKylinMetricsSubjectSuffix(); + } + + public String getKylinMetricsSubjectQueryCube() { + return getOptional("kylin.core.metrics.subject-query-cube", "METRICS_QUERY_CUBE") + "_" + + getKylinMetricsSubjectSuffix(); + } + + public String getKylinMetricsSubjectQueryRpcCall() { + return getOptional("kylin.core.metrics.subject-query-rpc", "METRICS_QUERY_RPC") + "_" + + getKylinMetricsSubjectSuffix(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-common/src/main/java/org/apache/kylin/common/QueryContext.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 0b8d519..6ee3448 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -18,14 +18,25 @@ package org.apache.kylin.common; +import java.io.Serializable; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * Holds per query information and statistics. */ public class QueryContext { + private static final Logger logger = LoggerFactory.getLogger(QueryContext.class); + private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() { @Override protected QueryContext initialValue() { @@ -37,6 +48,9 @@ public class QueryContext { private AtomicLong scannedRows = new AtomicLong(); private AtomicLong scannedBytes = new AtomicLong(); + private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList(); + private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap(); + private QueryContext() { // use QueryContext.current() instead @@ -74,4 +88,308 @@ public class QueryContext { public long addAndGetScannedBytes(long deltaBytes) { return scannedBytes.addAndGet(deltaBytes); } + + public void addContext(int ctxId, String type, boolean ifCube) { + Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null; + if (ifCube) { + cubeSegmentStatisticsMap = Maps.newConcurrentMap(); + } + cubeSegmentStatisticsResultMap.put(ctxId, new CubeSegmentStatisticsResult(type, cubeSegmentStatisticsMap)); + } + + public void setContextRealization(int ctxId, String realizationName, int realizationType) { + CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId); + if (cubeSegmentStatisticsResult == null) { + logger.warn("Cannot find CubeSegmentStatisticsResult for context " + ctxId); + return; + } + cubeSegmentStatisticsResult.setRealization(realizationName); + cubeSegmentStatisticsResult.setRealizationType(realizationType); + } + + public QueryStatisticsResult getQueryStatisticsResult() { + return new QueryStatisticsResult(rpcStatisticsList, + Lists.newArrayList(cubeSegmentStatisticsResultMap.values())); + } + + public void addRPCStatistics(int ctxId, String rpcServer, String cubeName, String segmentName, long sourceCuboidId, + long targetCuboidId, long filterMask, Exception e, long rpcCallTimeMs, long skippedRows, long scannedRows, + long returnedRows, long aggregatedRows, long scannedBytes) { + RPCStatistics rpcStatistics = new RPCStatistics(); + rpcStatistics.setWrapper(cubeName, rpcServer); + rpcStatistics.setStats(rpcCallTimeMs, skippedRows, scannedRows, returnedRows, aggregatedRows, scannedBytes); + rpcStatistics.setException(e); + rpcStatisticsList.add(rpcStatistics); + + CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId); + if (cubeSegmentStatisticsResult == null) { + logger.warn("CubeSegmentStatisticsResult should be initialized for context " + ctxId); + return; + } + Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; + if (cubeSegmentStatisticsMap == null) { + logger.warn( + "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type " + + cubeSegmentStatisticsResult.queryType); + return; + } + Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); + if (segmentStatisticsMap == null) { + segmentStatisticsMap = Maps.newConcurrentMap(); + cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap); + } + CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName); + if (segmentStatistics == null) { + segmentStatistics = new CubeSegmentStatistics(); + segmentStatisticsMap.put(segmentName, segmentStatistics); + segmentStatistics.setWrapper(cubeName, segmentName, sourceCuboidId, targetCuboidId, filterMask); + } + if (segmentStatistics.sourceCuboidId != sourceCuboidId || segmentStatistics.targetCuboidId != targetCuboidId + || segmentStatistics.filterMask != filterMask) { + StringBuilder inconsistency = new StringBuilder(); + if (segmentStatistics.sourceCuboidId != sourceCuboidId) { + inconsistency.append( + "sourceCuboidId exist " + segmentStatistics.sourceCuboidId + " input " + sourceCuboidId); + } + if (segmentStatistics.targetCuboidId != targetCuboidId) { + inconsistency.append( + "targetCuboidId exist " + segmentStatistics.targetCuboidId + " input " + targetCuboidId); + } + if (segmentStatistics.filterMask != filterMask) { + inconsistency.append("filterMask exist " + segmentStatistics.filterMask + " input " + filterMask); + } + logger.error("cube segment statistics wrapper is not consistent due to " + inconsistency.toString()); + return; + } + segmentStatistics.addRPCStats(rpcCallTimeMs, skippedRows, scannedRows, returnedRows, aggregatedRows, + scannedBytes, e == null); + } + + public static class RPCStatistics implements Serializable { + private String realizationName; + private String rpcServer; + + private Exception exception; + + private long callTimeMs; + private long skippedRows; + private long scannedRows; + private long returnedRows; + private long aggregatedRows; + + private long scannedBytes; + + public void setWrapper(String realizationName, String rpcServer) { + this.realizationName = realizationName; + this.rpcServer = rpcServer; + } + + public void setStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount, + long scanBytes) { + this.callTimeMs = callTimeMs; + this.skippedRows = skipCount; + this.scannedRows = scanCount; + this.returnedRows = returnCount; + this.aggregatedRows = aggrCount; + + this.scannedBytes = scanBytes; + } + + public void setException(Exception e) { + exception = e; + } + + public String getRealizationName() { + return realizationName; + } + + public String getRpcServer() { + return rpcServer; + } + + public Exception getException() { + return exception; + } + + public long getCallTimeMs() { + return callTimeMs; + } + + public long getSkippedRows() { + return skippedRows; + } + + public long getScannedRows() { + return scannedRows; + } + + public long getReturnedRows() { + return returnedRows; + } + + public long getAggregatedRows() { + return aggregatedRows; + } + + public long getScannedBytes() { + return scannedBytes; + } + } + + public static class CubeSegmentStatistics implements Serializable { + private String cubeName; + private String segmentName; + private long sourceCuboidId; + private long targetCuboidId; + private long filterMask; + + private boolean ifSuccess = true; + + private long callCount = 0L; + private long callTimeSum = 0L; + private long callTimeMax = 0L; + private long storageSkippedRows = 0L; + private long storageScannedRows = 0L; + private long storageReturnedRows = 0L; + private long storageAggregatedRows = 0L; + + private long storageScannedBytes = 0L; + + public void setWrapper(String cubeName, String segmentName, long sourceCuboidId, long targetCuboidId, + long filterMask) { + this.cubeName = cubeName; + this.segmentName = segmentName; + this.sourceCuboidId = sourceCuboidId; + this.targetCuboidId = targetCuboidId; + this.filterMask = filterMask; + } + + public void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount, + long scanBytes, boolean ifSuccess) { + this.callCount++; + this.callTimeSum += callTimeMs; + if (this.callTimeMax < callTimeMs) { + this.callTimeMax = callTimeMs; + } + this.storageSkippedRows += skipCount; + this.storageScannedRows += scanCount; + this.storageReturnedRows += returnCount; + this.storageAggregatedRows += aggrCount; + this.ifSuccess = this.ifSuccess && ifSuccess; + + this.storageScannedBytes += scanBytes; + } + + public String getCubeName() { + return cubeName; + } + + public long getStorageScannedBytes() { + return storageScannedBytes; + } + + public long getStorageAggregatedRows() { + return storageAggregatedRows; + } + + public long getStorageReturnedRows() { + return storageReturnedRows; + } + + public long getStorageSkippedRows() { + return storageSkippedRows; + } + + public long getStorageScannedRows() { + return storageScannedRows; + } + + public long getCallTimeMax() { + return callTimeMax; + } + + public long getCallTimeSum() { + return callTimeSum; + } + + public long getCallCount() { + return callCount; + } + + public boolean isIfSuccess() { + return ifSuccess; + } + + public long getFilterMask() { + return filterMask; + } + + public long getTargetCuboidId() { + return targetCuboidId; + } + + public long getSourceCuboidId() { + return sourceCuboidId; + } + + public String getSegmentName() { + return segmentName; + } + } + + public static class CubeSegmentStatisticsResult implements Serializable { + private final String queryType; + private final Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap; + private String realization; + private int realizationType; + + public CubeSegmentStatisticsResult(String queryType, + Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) { + this.queryType = queryType; + this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap; + } + + public void setRealization(String realization) { + this.realization = realization; + } + + public String getRealization() { + return realization; + } + + public int getRealizationType() { + return realizationType; + } + + public void setRealizationType(int realizationType) { + this.realizationType = realizationType; + } + + public String getQueryType() { + return queryType; + } + + public Map<String, Map<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() { + return cubeSegmentStatisticsMap; + } + } + + public static class QueryStatisticsResult implements Serializable { + private final List<RPCStatistics> rpcStatisticsList; + private final List<CubeSegmentStatisticsResult> cubeSegmentStatisticsResultList; + + public QueryStatisticsResult(List<RPCStatistics> rpcStatisticsList, + List<CubeSegmentStatisticsResult> cubeSegmentStatisticsResultList) { + this.rpcStatisticsList = rpcStatisticsList; + this.cubeSegmentStatisticsResultList = cubeSegmentStatisticsResultList; + } + + public List<RPCStatistics> getRpcStatisticsList() { + return rpcStatisticsList; + } + + public List<CubeSegmentStatisticsResult> getCubeSegmentStatisticsResultList() { + return cubeSegmentStatisticsResultList; + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java index 8899f07..2616c38 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java @@ -92,17 +92,17 @@ public class MetricsManager { if (scSink == null) { logger.warn("SystemCubeSink is not set and the default one will be chosen"); try { - Class clz = Class.forName(KylinConfig.getInstanceFromEnv().getSystemCubeSinkDefaultClass()); + Class clz = Class.forName(KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass()); scSink = (Sink) clz.getConstructor().newInstance(); } catch (Exception e) { logger.warn( - "Failed to initialize the " + KylinConfig.getInstanceFromEnv().getSystemCubeSinkDefaultClass() + "Failed to initialize the " + KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass() + ". The StubSink will be used"); scSink = new StubSink(); } } - if (KylinConfig.getInstanceFromEnv().isMetricsMonitorEnabled()) { + if (KylinConfig.getInstanceFromEnv().isKylinMetricsMonitorEnabled()) { logger.info("Kylin metrics monitor is enabled."); int nameIdx = 0; for (ActiveReservoir activeReservoir : sourceReporterBindProps.keySet()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java index d87a648..d48a860 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java @@ -147,7 +147,7 @@ public class MetricsSystem extends MetricRegistry { if (activeReservoir != null) { return activeReservoir; } else { - String defaultActiveReservoirClass = KylinConfig.getInstanceFromEnv().getMetricsActiveReservoirDefaultClass(); + String defaultActiveReservoirClass = KylinConfig.getInstanceFromEnv().getKylinMetricsActiveReservoirDefaultClass(); try { activeReservoir = (ActiveReservoir) Class.forName(defaultActiveReservoirClass).getConstructor().newInstance(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java index 5460848..b5fa218 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java @@ -27,20 +27,6 @@ public class CubeSegmentRecordEventWrapper extends RecordEventWrapper { public CubeSegmentRecordEventWrapper(RecordEvent metricsEvent) { super(metricsEvent); - - initStats(); - } - - private void initStats() { - this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), 0L); - this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), 0L); - this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), 0L); - this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), 0L); - this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), 0L); - this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), 0L); - this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), 0L); - this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), 0L); - this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), true); } public void setWrapper(String projectName, String cubeName, String segmentName, long sourceCuboidId, @@ -54,36 +40,20 @@ public class CubeSegmentRecordEventWrapper extends RecordEventWrapper { this.metricsEvent.put(PropertyEnum.FILTER_MASK.toString(), filterMask); } - public void setWeightPerHit(double weightPerHit) { + public void setStats(long callCount, long callTimeSum, long callTimeMax, long skipCount, long scanCount, + long returnCount, long aggrCount, boolean ifSuccess, double weightPerHit) { + this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), callCount); + this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), callTimeSum); + this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), callTimeMax); + this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), skipCount); + this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), scanCount); + this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), returnCount); + this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); + this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), aggrCount); + this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), ifSuccess); this.metricsEvent.put(PropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit); } - public void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount, - boolean ifSuccess) { - Long curCallCount = (Long) this.metricsEvent.get(PropertyEnum.CALL_COUNT.toString()); - Long curTimeSum = (Long) this.metricsEvent.get(PropertyEnum.TIME_SUM.toString()); - Long curTimeMax = (Long) this.metricsEvent.get(PropertyEnum.TIME_MAX.toString()); - Long curSkipCount = (Long) this.metricsEvent.get(PropertyEnum.SKIP_COUNT.toString()); - Long curScanCount = (Long) this.metricsEvent.get(PropertyEnum.SCAN_COUNT.toString()); - Long curReturnCount = (Long) this.metricsEvent.get(PropertyEnum.RETURN_COUNT.toString()); - Long curAggrAndFilterCount = (Long) this.metricsEvent.get(PropertyEnum.AGGR_FILTER_COUNT.toString()); - Long curAggrCount = (Long) this.metricsEvent.get(PropertyEnum.AGGR_COUNT.toString()); - Boolean curIfSuccess = (Boolean) this.metricsEvent.get(PropertyEnum.IF_SUCCESS.toString()); - - this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), curCallCount + 1); - this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), curTimeSum + callTimeMs); - if (curTimeMax < callTimeMs) { - this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), callTimeMs); - } - this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), curSkipCount + skipCount); - this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), curScanCount + scanCount); - this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), curReturnCount + returnCount); - this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), - curAggrAndFilterCount + scanCount - returnCount); - this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), curAggrCount + aggrCount); - this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), curIfSuccess && ifSuccess); - } - public Object getProperty(String key) { return this.metricsEvent.get(key); } http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java index 937a83d..8ea0222 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java @@ -20,53 +20,41 @@ package org.apache.kylin.metrics.query; import org.apache.kylin.metrics.lib.impl.RecordEvent; import org.apache.kylin.metrics.lib.impl.RecordEventWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Strings; public class QueryRecordEventWrapper extends RecordEventWrapper { + private static final Logger logger = LoggerFactory.getLogger(QueryRecordEventWrapper.class); + public QueryRecordEventWrapper(RecordEvent metricsEvent) { super(metricsEvent); - initStats(); - } - - private void initStats() { - this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), "NULL"); - this.metricsEvent.put(PropertyEnum.TIME_COST.toString(), 0L); - this.metricsEvent.put(PropertyEnum.CALCITE_RETURN_COUNT.toString(), 0L); - this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_COUNT.toString(), 0L); - setDependentStats(); } public void setWrapper(long queryHashCode, String queryType, String projectName, String realizationName, - int realizationType) { + int realizationType, Throwable throwable) { this.metricsEvent.put(PropertyEnum.ID_CODE.toString(), queryHashCode); this.metricsEvent.put(PropertyEnum.TYPE.toString(), queryType); this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName); this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), realizationName); this.metricsEvent.put(PropertyEnum.REALIZATION_TYPE.toString(), realizationType); + this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), + throwable == null ? "NULL" : throwable.getClass().getName()); } - public void addStorageStats(long addReturnCountByStorage) { - Long curReturnCountByStorage = (Long) this.metricsEvent.get(PropertyEnum.STORAGE_RETURN_COUNT.toString()); - this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_COUNT.toString(), - curReturnCountByStorage + addReturnCountByStorage); - } - - public void setStats(long callTimeMs, long returnCountByCalcite) { + public void setStats(long callTimeMs, long returnCountByCalcite, long returnCountByStorage) { this.metricsEvent.put(PropertyEnum.TIME_COST.toString(), callTimeMs); this.metricsEvent.put(PropertyEnum.CALCITE_RETURN_COUNT.toString(), returnCountByCalcite); - setDependentStats(); - } - - private void setDependentStats() { - this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), - Math.max(0L, (Long) this.metricsEvent.get(PropertyEnum.STORAGE_RETURN_COUNT.toString()) - - (Long) this.metricsEvent.get(PropertyEnum.CALCITE_RETURN_COUNT.toString()))); - } - - public <T extends Throwable> void setStats(Class<T> exceptionClassName) { - this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), exceptionClassName.getName()); + this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_COUNT.toString(), returnCountByStorage); + long countAggrAndFilter = returnCountByStorage - returnCountByCalcite; + if (countAggrAndFilter < 0) { + countAggrAndFilter = 0; + logger.warn(returnCountByStorage + " rows returned by storage less than " + returnCountByCalcite + + " rows returned by calcite"); + } + this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), countAggrAndFilter); } public enum PropertyEnum { http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java index e8774a4..abcbb61 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java @@ -29,14 +29,15 @@ public class RPCRecordEventWrapper extends RecordEventWrapper { super(metricsEvent); } - public void setRPCCallWrapper(String projectName, String realizationName, String rpcServer) { - this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), "NULL"); + public void setWrapper(String projectName, String realizationName, String rpcServer, Throwable throwable) { this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName); this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), realizationName); this.metricsEvent.put(PropertyEnum.RPC_SERVER.toString(), rpcServer); + this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), + throwable == null ? "NULL" : throwable.getClass().getName()); } - public void setRPCCallStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount) { + public void setStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount) { this.metricsEvent.put(PropertyEnum.CALL_TIME.toString(), callTimeMs); this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server @@ -45,10 +46,6 @@ public class RPCRecordEventWrapper extends RecordEventWrapper { this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor } - public <T extends Throwable> void setStats(Class<T> exceptionClassName) { - this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), exceptionClassName.getName()); - } - public enum PropertyEnum { PROJECT("PROJECT"), REALIZATION("REALIZATION"), RPC_SERVER("RPC_SERVER"), EXCEPTION("EXCEPTION"), // CALL_TIME("CALL_TIME"), SKIP_COUNT("COUNT_SKIP"), SCAN_COUNT("COUNT_SCAN"), RETURN_COUNT( http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 78cf97c..6f8815b 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -36,6 +36,8 @@ import com.google.common.collect.Range; public class StorageContext { private static final Logger logger = LoggerFactory.getLogger(StorageContext.class); + public final int ctxId; + private StorageURL connUrl; private int limit = Integer.MAX_VALUE; private boolean overlookOuterLimit = false; @@ -57,6 +59,24 @@ public class StorageContext { private Range<Long> reusedPeriod; + private long filterMask; + + public StorageContext() { + this(0); + } + + public StorageContext(int ctxId) { + this.ctxId = ctxId; + } + + public long getFilterMask() { + return filterMask; + } + + public void setFilterMask(long filterMask) { + this.filterMask = filterMask; + } + public StorageURL getConnUrl() { return connUrl; } http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 22f5fc9..ff3cc05 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -34,6 +34,7 @@ import org.apache.kylin.cube.RawQueryLastHacker; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; +import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.dict.lookup.LookupStringTable; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.CaseTupleFilter; @@ -141,6 +142,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { TupleFilter filterD = translateDerived(filter, loosenedColumnD); groupsD.addAll(loosenedColumnD); TupleFilter.collectColumns(filter, filterColumnD); + context.setFilterMask(getQueryFilterMask(filterColumnD)); // set limit push down enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context); @@ -253,6 +255,22 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return resultD; } + private long getQueryFilterMask(Set<TblColRef> filterColumnD) { + long filterMask = 0; + + logger.info("Filter column set for query: " + filterColumnD.toString()); + if (filterColumnD.size() != 0) { + RowKeyColDesc[] allColumns = cubeDesc.getRowkey().getRowKeyColumns(); + for (int i = 0; i < allColumns.length; i++) { + if (filterColumnD.contains(allColumns[i].getColRef())) { + filterMask |= 1L << allColumns[i].getBitIndex(); + } + } + } + logger.info("Filter mask is: " + filterMask); + return filterMask; + } + public boolean isNeedStorageAggregation(Cuboid cuboid, Collection<TblColRef> groupD, Collection<TblColRef> singleValueD) { HashSet<TblColRef> temp = Sets.newHashSet(); temp.addAll(groupD); http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java index debc125..b562cd5 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java @@ -22,6 +22,7 @@ import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.query.relnode.OLAPContext; import org.slf4j.Logger; @@ -47,6 +48,8 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl this.optiqContext = optiqContext; this.type = type; this.contextId = ctxId; + + QueryContext.current().addContext(ctxId, type.toString(), type == EnumeratorTypeEnum.OLAP); } public OLAPQuery(EnumeratorTypeEnum type, int ctxSeq) { http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index 31ed075..476f045 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -88,7 +88,7 @@ public class OLAPContext { public OLAPContext(int seq) { this.id = seq; - this.storageContext = new StorageContext(); + this.storageContext = new StorageContext(seq); this.sortColumns = Lists.newArrayList(); this.sortOrders = Lists.newArrayList(); Map<String, String> parameters = _localPrarameters.get(); http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index 48a8e58..8b58382 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -18,16 +18,28 @@ package org.apache.kylin.rest.metrics; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.concurrent.ThreadSafe; + import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.metrics.MetricsManager; +import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.apache.kylin.metrics.query.CubeSegmentRecordEventWrapper; +import org.apache.kylin.metrics.query.QueryRecordEventWrapper; +import org.apache.kylin.metrics.query.RPCRecordEventWrapper; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.ThreadSafe; -import java.util.concurrent.ConcurrentHashMap; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; /** * The entrance of metrics features. @@ -36,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap; public class QueryMetricsFacade { private static final Logger logger = LoggerFactory.getLogger(QueryMetricsFacade.class); + private static final HashFunction hashFunc = Hashing.murmur3_128(); private static boolean enabled = false; private static ConcurrentHashMap<String, QueryMetrics> metricsMap = new ConcurrentHashMap<String, QueryMetrics>(); @@ -48,6 +61,10 @@ public class QueryMetricsFacade { DefaultMetricsSystem.initialize("Kylin"); } + public static long getSqlHashCode(String sql) { + return hashFunc.hashString(sql, Charset.forName("UTF-8")).asLong(); + } + public static void updateMetrics(SQLRequest sqlRequest, SQLResponse sqlResponse) { if (!enabled) return; @@ -61,6 +78,57 @@ public class QueryMetricsFacade { String cubeMetricName = projectName + ",sub=" + cubeName; update(getQueryMetrics(cubeMetricName), sqlResponse); + + /** + * report query related metrics + */ + final QueryContext.QueryStatisticsResult queryStatisticsResult = sqlResponse.getQueryStatistics(); + for (QueryContext.RPCStatistics entry : queryStatisticsResult.getRpcStatisticsList()) { + RPCRecordEventWrapper rpcMetricsEventWrapper = new RPCRecordEventWrapper( + new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall())); + rpcMetricsEventWrapper.setWrapper(sqlRequest.getProject(), entry.getRealizationName(), entry.getRpcServer(), + entry.getException()); + rpcMetricsEventWrapper.setStats(entry.getCallTimeMs(), entry.getSkippedRows(), entry.getScannedRows(), + entry.getReturnedRows(), entry.getAggregatedRows()); + //For update rpc level related metrics + MetricsManager.getInstance().update(rpcMetricsEventWrapper.getMetricsRecord()); + } + long sqlHashCode = getSqlHashCode(sqlRequest.getSql()); + for (QueryContext.CubeSegmentStatisticsResult contextEntry : queryStatisticsResult + .getCubeSegmentStatisticsResultList()) { + QueryRecordEventWrapper queryMetricsEventWrapper = new QueryRecordEventWrapper( + new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery())); + queryMetricsEventWrapper.setWrapper(sqlHashCode, + sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(), sqlRequest.getProject(), + contextEntry.getRealization(), contextEntry.getRealizationType(), sqlResponse.getThrowable()); + + long totalStorageReturnCount = 0L; + for (Map<String, QueryContext.CubeSegmentStatistics> cubeEntry : contextEntry.getCubeSegmentStatisticsMap() + .values()) { + for (QueryContext.CubeSegmentStatistics segmentEntry : cubeEntry.values()) { + CubeSegmentRecordEventWrapper cubeSegmentMetricsEventWrapper = new CubeSegmentRecordEventWrapper( + new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube())); + + cubeSegmentMetricsEventWrapper.setWrapper(sqlRequest.getProject(), segmentEntry.getCubeName(), + segmentEntry.getSegmentName(), segmentEntry.getSourceCuboidId(), + segmentEntry.getTargetCuboidId(), segmentEntry.getFilterMask()); + + cubeSegmentMetricsEventWrapper.setStats(segmentEntry.getCallCount(), segmentEntry.getCallTimeSum(), + segmentEntry.getCallTimeMax(), segmentEntry.getStorageSkippedRows(), + segmentEntry.getStorageScannedRows(), segmentEntry.getStorageReturnedRows(), + segmentEntry.getStorageAggregatedRows(), segmentEntry.isIfSuccess(), + 1.0 / cubeEntry.size()); + + totalStorageReturnCount += segmentEntry.getStorageReturnedRows(); + //For update cube segment level related query metrics + MetricsManager.getInstance().update(cubeSegmentMetricsEventWrapper.getMetricsRecord()); + } + } + queryMetricsEventWrapper.setStats(sqlResponse.getDuration(), sqlResponse.getResults().size(), + totalStorageReturnCount); + //For update query level metrics + MetricsManager.getInstance().update(queryMetricsEventWrapper.getMetricsRecord()); + } } private static void update(QueryMetrics queryMetrics, SQLResponse sqlResponse) { http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 79a2c05..bed4764 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -21,8 +21,12 @@ package org.apache.kylin.rest.response; import java.io.Serializable; import java.util.List; +import org.apache.commons.lang.SerializationUtils; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; +import com.fasterxml.jackson.annotation.JsonIgnore; + public class SQLResponse implements Serializable { protected static final long serialVersionUID = 1L; @@ -49,6 +53,9 @@ public class SQLResponse implements Serializable { // if isException, the detailed exception message protected String exceptionMessage; + // if isException, the related Exception + protected Throwable throwable; + protected long duration; protected boolean isPartial = false; @@ -63,6 +70,8 @@ public class SQLResponse implements Serializable { protected boolean queryPushDown = false; + protected byte[] queryStatistics; + public SQLResponse() { } @@ -137,6 +146,15 @@ public class SQLResponse implements Serializable { exceptionMessage = msg; } + @JsonIgnore + public Throwable getThrowable() { + return throwable; + } + + public void setThrowable(Throwable throwable) { + this.throwable = throwable; + } + public long getDuration() { return duration; } @@ -185,4 +203,13 @@ public class SQLResponse implements Serializable { public void setStorageCacheUsed(boolean storageCacheUsed) { this.storageCacheUsed = storageCacheUsed; } + + @JsonIgnore + public QueryContext.QueryStatisticsResult getQueryStatistics() { + return (QueryContext.QueryStatisticsResult) SerializationUtils.deserialize(queryStatistics); + } + + public void setQueryStatistics(QueryContext.QueryStatisticsResult queryStatisticsResult) { + this.queryStatistics = SerializationUtils.serialize(queryStatisticsResult); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index ab47caa..a01997e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -82,6 +82,7 @@ import org.apache.kylin.metadata.querymeta.TableMetaWithType; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.QueryConnection; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; @@ -94,7 +95,6 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclUtil; -import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -422,6 +422,7 @@ public class QueryService extends BasicService { String errMsg = QueryUtil.makeErrorMsgUserFriendly(e); sqlResponse = new SQLResponse(null, null, 0, true, errMsg); + sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); sqlResponse.setTotalScanCount(queryContext.getScannedRows()); sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); @@ -855,6 +856,8 @@ public class QueryService extends BasicService { StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: "); if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { + String realizationName = "NULL"; + int realizationType = -1; if (ctx.realization != null) { isPartialResult |= ctx.storageContext.isPartialResultReturned(); if (cubeSb.length() > 0) { @@ -862,7 +865,11 @@ public class QueryService extends BasicService { } cubeSb.append(ctx.realization.getCanonicalName()); logSb.append(ctx.storageContext.getProcessedRowCount()).append(" "); + + realizationName = ctx.realization.getName(); + realizationType = ctx.realization.getStorageType(); } + QueryContext.current().setContextRealization(ctx.id, realizationName, realizationType); } } logger.info(logSb.toString()); @@ -871,6 +878,7 @@ public class QueryService extends BasicService { isPushDown); response.setTotalScanCount(QueryContext.current().getScannedRows()); response.setTotalScanBytes(QueryContext.current().getScannedBytes()); + response.setQueryStatistics(QueryContext.current().getQueryStatisticsResult()); return response; } http://git-wip-us.apache.org/repos/asf/kylin/blob/594087a3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index e822ada..2301df3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -204,14 +204,26 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { queryContext.addAndGetScannedRows(stats.getScannedRowCount()); queryContext.addAndGetScannedBytes(stats.getScannedBytes()); + RuntimeException rpcException = null; + if (result.getStats().getNormalComplete() != 1) { + rpcException = getCoprocessorException(result); + } + queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(), + cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(), + cuboid.getId(), storageContext.getFilterMask(), rpcException, + stats.getServiceEndTime() - stats.getServiceStartTime(), 0, + stats.getScannedRowCount(), + stats.getScannedRowCount() - stats.getAggregatedRowCount(), + stats.getAggregatedRowCount(), stats.getScannedBytes()); + // if any other region has responded with error, skip further processing if (regionErrorHolder.get() != null) { return; } // record coprocessor error if happened - if (result.getStats().getNormalComplete() != 1) { - regionErrorHolder.compareAndSet(null, getCoprocessorException(result)); + if (rpcException != null) { + regionErrorHolder.compareAndSet(null, rpcException); return; }