APACHE-KYLIN-2723: add trigger for query & job metrics reporter
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4ef79623 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4ef79623 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4ef79623 Branch: refs/heads/yaho-cube-planner Commit: 4ef79623aa18f01c32b8969caed81b3904d87651 Parents: 74e167f Author: Zhong <nju_y...@apache.org> Authored: Mon Aug 14 19:43:57 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Mon Aug 14 19:43:57 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 8 +++ .../org/apache/kylin/common/QueryContext.java | 37 +++++++++++ .../kylin/job/metrics/JobMetricsFacade.java | 3 + .../apache/kylin/metrics/MetricsManager.java | 40 +++++++----- .../kylin/rest/metrics/QueryMetricsFacade.java | 16 ++++- .../apache/kylin/rest/response/SQLResponse.java | 12 +++- .../apache/kylin/rest/service/QueryService.java | 1 + server/src/main/resources/kylinMetrics.xml | 69 +++++++++----------- .../kylin/rest/metrics/QueryMetricsTest.java | 42 ++++++++++++ 9 files changed, 169 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8df97ad..7041e9a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1128,6 +1128,14 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.core.metrics.monitor-enabled", "false")); } + public boolean isKylinMetricsReporterForQueryEnabled() { + return Boolean.parseBoolean(getOptional("kylin.core.metrics.reporter-query-enabled", "false")); + } + + public boolean isKylinMetricsReporterForJobEnabled() { + return Boolean.parseBoolean(getOptional("kylin.core.metrics.reporter-job-enabled", "true")); + } + public String getKylinMetricsActiveReservoirDefaultClass() { return getOptional("kylin.core.metrics.active-reservoir-default-class", "org.apache.kylin.metrics.lib.impl.StubReservoir"); http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-common/src/main/java/org/apache/kylin/common/QueryContext.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 6ee3448..09fbd13 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -166,6 +166,8 @@ public class QueryContext { } public static class RPCStatistics implements Serializable { + protected static final long serialVersionUID = 1L; + private String realizationName; private String rpcServer; @@ -234,9 +236,16 @@ public class QueryContext { public long getScannedBytes() { return scannedBytes; } + + @Override + public String toString() { + return "RPCStatistics [rpcServer=" + rpcServer + ",realizationName=" + realizationName + "]"; + } } public static class CubeSegmentStatistics implements Serializable { + protected static final long serialVersionUID = 1L; + private String cubeName; private String segmentName; private long sourceCuboidId; @@ -335,9 +344,17 @@ public class QueryContext { public String getSegmentName() { return segmentName; } + + @Override + public String toString() { + return "CubeSegmentStatistics [cubeName=" + cubeName + ",segmentName=" + segmentName + ",sourceCuboidId=" + + sourceCuboidId + ",targetCuboidId=" + targetCuboidId + ",filterMask=" + filterMask + "]"; + } } public static class CubeSegmentStatisticsResult implements Serializable { + protected static final long serialVersionUID = 1L; + private final String queryType; private final Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap; private String realization; @@ -372,12 +389,26 @@ public class QueryContext { public Map<String, Map<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() { return cubeSegmentStatisticsMap; } + + @Override + public String toString() { + return "CubeSegmentStatisticsResult [queryType=" + queryType + ",realization=" + realization + + ",realizationType=" + realizationType + ",cubeSegmentStatisticsMap=" + cubeSegmentStatisticsMap + + "]"; + } } public static class QueryStatisticsResult implements Serializable { + protected static final long serialVersionUID = 1L; + private final List<RPCStatistics> rpcStatisticsList; private final List<CubeSegmentStatisticsResult> cubeSegmentStatisticsResultList; + public QueryStatisticsResult() { + rpcStatisticsList = Lists.newArrayList(); + cubeSegmentStatisticsResultList = Lists.newArrayList(); + } + public QueryStatisticsResult(List<RPCStatistics> rpcStatisticsList, List<CubeSegmentStatisticsResult> cubeSegmentStatisticsResultList) { this.rpcStatisticsList = rpcStatisticsList; @@ -391,5 +422,11 @@ public class QueryContext { public List<CubeSegmentStatisticsResult> getCubeSegmentStatisticsResultList() { return cubeSegmentStatisticsResultList; } + + @Override + public String toString() { + return "QueryStatisticsResult [rpcStatisticsList=" + rpcStatisticsList + ",cubeSegmentStatisticsResultList" + + cubeSegmentStatisticsResultList + "]"; + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java index 07fcc49..904c4bd 100644 --- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java +++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java @@ -30,6 +30,9 @@ public class JobMetricsFacade { private static final Logger logger = LoggerFactory.getLogger(JobMetricsFacade.class); public static void updateMetrics(JobStatisticsResult jobStats) { + if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForJobEnabled()) { + return; + } /** * report job related metrics */ http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java index 2616c38..ce28bf6 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java @@ -62,11 +62,30 @@ public class MetricsManager { return instance; } - public static void setSystemCubeSink(Sink systemCubeSink) { + public static void initMetricsManager(Sink systemCubeSink, + Map<ActiveReservoir, List<Pair<String, Properties>>> sourceReporterBindProperties) { + setSystemCubeSink(systemCubeSink); + setSourceReporterBindProps(sourceReporterBindProperties); + instance.init(); + } + + private static void setSystemCubeSink(Sink systemCubeSink) { + if (systemCubeSink == null) { + logger.warn("SystemCubeSink is not set and the default one will be chosen"); + try { + Class clz = Class.forName(KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass()); + systemCubeSink = (Sink) clz.getConstructor().newInstance(); + } catch (Exception e) { + logger.warn("Failed to initialize the " + + KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass() + + ". The StubSink will be used"); + systemCubeSink = new StubSink(); + } + } scSink = systemCubeSink; } - public static void setSourceReporterBindProps( + private static void setSourceReporterBindProps( Map<ActiveReservoir, List<Pair<String, Properties>>> sourceReporterBindProperties) { sourceReporterBindProps = Maps.newHashMapWithExpectedSize(sourceReporterBindProperties.size()); for (ActiveReservoir activeReservoir : sourceReporterBindProperties.keySet()) { @@ -88,20 +107,7 @@ public class MetricsManager { } } - public void init() { - if (scSink == null) { - logger.warn("SystemCubeSink is not set and the default one will be chosen"); - try { - Class clz = Class.forName(KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass()); - scSink = (Sink) clz.getConstructor().newInstance(); - } catch (Exception e) { - logger.warn( - "Failed to initialize the " + KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass() - + ". The StubSink will be used"); - scSink = new StubSink(); - } - } - + private void init() { if (KylinConfig.getInstanceFromEnv().isKylinMetricsMonitorEnabled()) { logger.info("Kylin metrics monitor is enabled."); int nameIdx = 0; @@ -136,7 +142,7 @@ public class MetricsManager { } } - public String getSystemTableFromSubject(String subject) { + public static String getSystemTableFromSubject(String subject) { return scSink.getTableFromSubject(subject); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index 3a5c664..4e59f10 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -68,6 +68,11 @@ public class QueryMetricsFacade { } public static void updateMetrics(SQLRequest sqlRequest, SQLResponse sqlResponse) { + updateMetricsToLocal(sqlRequest, sqlResponse); + updateMetricsToReservoir(sqlRequest, sqlResponse); + } + + private static void updateMetricsToLocal(SQLRequest sqlRequest, SQLResponse sqlResponse) { if (!enabled) return; @@ -80,10 +85,15 @@ public class QueryMetricsFacade { String cubeMetricName = projectName + ",sub=" + cubeName; update(getQueryMetrics(cubeMetricName), sqlResponse); + } - /** - * report query related metrics - */ + /** + * report query related metrics + */ + private static void updateMetricsToReservoir(SQLRequest sqlRequest, SQLResponse sqlResponse) { + if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) { + return; + } String user = SecurityContextHolder.getContext().getAuthentication().getName(); if (user == null) { user = "unknown"; http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index bed4764..bca52bc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -206,10 +206,18 @@ public class SQLResponse implements Serializable { @JsonIgnore public QueryContext.QueryStatisticsResult getQueryStatistics() { - return (QueryContext.QueryStatisticsResult) SerializationUtils.deserialize(queryStatistics); + if (queryStatistics != null) { + try { + return (QueryContext.QueryStatisticsResult) SerializationUtils.deserialize(queryStatistics); + } catch (Exception e) { // Exception may happen due to + System.out.println("Error while deserialize queryStatistics due to " + e); + } + } + return new QueryContext.QueryStatisticsResult(); } public void setQueryStatistics(QueryContext.QueryStatisticsResult queryStatisticsResult) { - this.queryStatistics = SerializationUtils.serialize(queryStatisticsResult); + this.queryStatistics = queryStatisticsResult == null ? null + : SerializationUtils.serialize(queryStatisticsResult); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index a01997e..d59df2e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -425,6 +425,7 @@ public class QueryService extends BasicService { sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); sqlResponse.setTotalScanCount(queryContext.getScannedRows()); sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); + sqlResponse.setQueryStatistics(queryContext.getQueryStatisticsResult()); if (queryCacheEnabled && e.getCause() != null && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server/src/main/resources/kylinMetrics.xml ---------------------------------------------------------------------- diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml index 92e391f..354b1a0 100644 --- a/server/src/main/resources/kylinMetrics.xml +++ b/server/src/main/resources/kylinMetrics.xml @@ -26,7 +26,7 @@ <value>10</value> </constructor-arg> <constructor-arg index="1"> - <value>100</value> + <value>10</value> </constructor-arg> <constructor-arg index="2"> <value>10</value> @@ -37,48 +37,43 @@ <bean id="kafkaSink" class="org.apache.kylin.metrics.lib.impl.kafka.KafkaSink"/> - <bean id="systemCubeSink" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> + <bean id="initMetricsManager" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> <property name="targetClass" value="org.apache.kylin.metrics.MetricsManager"/> - <property name="targetMethod" value="setSystemCubeSink"/> + <property name="targetMethod" value="initMetricsManager"/> <property name="arguments"> <list> <ref bean="hiveSink"/> + <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" value-type="java.util.List"> + <!-- + <entry key-ref="instantReservoir"> + <list> + <bean class="org.apache.kylin.common.util.Pair"> + <property name="first" + value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/> + <property name="second"> + <props> + <prop key="bootstrap.servers">sandbox:9092</prop> + </props> + </property> + </bean> + </list> + </entry> + --> + <entry key-ref="blockingReservoir"> + <list> + <bean class="org.apache.kylin.common.util.Pair"> + <property name="first" + value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/> + <property name="second"> + <props> + </props> + </property> + </bean> + </list> + </entry> + </map> </list> </property> </bean> - <bean id="sourceReporterBindProperties" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> - <property name="targetClass" value="org.apache.kylin.metrics.MetricsManager"/> - <property name="targetMethod" value="setSourceReporterBindProps"/> - <property name="arguments"> - <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" value-type="java.util.List"> - <entry key-ref="instantReservoir"> - <list> - <bean class="org.apache.kylin.common.util.Pair"> - <property name="first" - value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/> - <property name="second"> - <props> - <prop key="bootstrap.servers">sandbox:9092</prop> - </props> - </property> - </bean> - </list> - </entry> - <entry key-ref="blockingReservoir"> - <list> - <bean class="org.apache.kylin.common.util.Pair"> - <property name="first" - value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/> - <property name="second"> - <props> - </props> - </property> - </bean> - </list> - </entry> - </map> - </property> - </bean> - </beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java index bd1da59..c16c350 100644 --- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java +++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java @@ -25,6 +25,7 @@ import java.util.List; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.ServiceTestBase; @@ -111,4 +112,45 @@ public class QueryMetricsTest extends ServiceTestBase { System.clearProperty("kylin.server.query-metrics-enabled"); } + @Test + public void testQueryStatisticsResult() throws Exception { + System.setProperty("kylin.core.metrics.reporter-query-enabled", "true"); + QueryMetricsFacade.init(); + + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setSql("select * from TEST_KYLIN_FACT"); + sqlRequest.setProject("default"); + + SQLResponse sqlResponse = new SQLResponse(); + sqlResponse.setDuration(10); + sqlResponse.setCube("test_cube"); + sqlResponse.setIsException(false); + sqlResponse.setTotalScanCount(100); + List<String> list1 = new ArrayList<>(); + list1.add("111"); + list1.add("112"); + List<String> list2 = new ArrayList<>(); + list2.add("111"); + list2.add("112"); + List<List<String>> results = new ArrayList<>(); + results.add(list1); + results.add(list2); + sqlResponse.setResults(results); + sqlResponse.setStorageCacheUsed(true); + + QueryContext context = QueryContext.current(); + int ctxId = 0; + context.addContext(ctxId, "OLAP", true); + context.addRPCStatistics(ctxId, "sandbox", "test_cube", "20100101000000_20150101000000", 3L, 3L, 3L, null, 80L, + 0L, 2L, 2L, 0L, 30L); + + sqlResponse.setQueryStatistics(context.getQueryStatisticsResult()); + + QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); + + Thread.sleep(2000); + + System.clearProperty("kylin.server.query-metrics-enabled"); + System.out.println("------------testQueryStatisticsResult done------------"); + } }