APACHE-KYLIN-2723: add trigger for query & job metrics reporter

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4ef79623
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4ef79623
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4ef79623

Branch: refs/heads/yaho-cube-planner
Commit: 4ef79623aa18f01c32b8969caed81b3904d87651
Parents: 74e167f
Author: Zhong <nju_y...@apache.org>
Authored: Mon Aug 14 19:43:57 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Mon Aug 14 19:43:57 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  8 +++
 .../org/apache/kylin/common/QueryContext.java   | 37 +++++++++++
 .../kylin/job/metrics/JobMetricsFacade.java     |  3 +
 .../apache/kylin/metrics/MetricsManager.java    | 40 +++++++-----
 .../kylin/rest/metrics/QueryMetricsFacade.java  | 16 ++++-
 .../apache/kylin/rest/response/SQLResponse.java | 12 +++-
 .../apache/kylin/rest/service/QueryService.java |  1 +
 server/src/main/resources/kylinMetrics.xml      | 69 +++++++++-----------
 .../kylin/rest/metrics/QueryMetricsTest.java    | 42 ++++++++++++
 9 files changed, 169 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8df97ad..7041e9a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1128,6 +1128,14 @@ abstract public class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.core.metrics.monitor-enabled", 
"false"));
     }
 
+    public boolean isKylinMetricsReporterForQueryEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.core.metrics.reporter-query-enabled", 
"false"));
+    }
+
+    public boolean isKylinMetricsReporterForJobEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.core.metrics.reporter-job-enabled", 
"true"));
+    }
+
     public String getKylinMetricsActiveReservoirDefaultClass() {
         return getOptional("kylin.core.metrics.active-reservoir-default-class",
                 "org.apache.kylin.metrics.lib.impl.StubReservoir");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 6ee3448..09fbd13 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -166,6 +166,8 @@ public class QueryContext {
     }
 
     public static class RPCStatistics implements Serializable {
+        protected static final long serialVersionUID = 1L;
+
         private String realizationName;
         private String rpcServer;
 
@@ -234,9 +236,16 @@ public class QueryContext {
         public long getScannedBytes() {
             return scannedBytes;
         }
+
+        @Override
+        public String toString() {
+            return "RPCStatistics [rpcServer=" + rpcServer + 
",realizationName=" + realizationName + "]";
+        }
     }
 
     public static class CubeSegmentStatistics implements Serializable {
+        protected static final long serialVersionUID = 1L;
+
         private String cubeName;
         private String segmentName;
         private long sourceCuboidId;
@@ -335,9 +344,17 @@ public class QueryContext {
         public String getSegmentName() {
             return segmentName;
         }
+
+        @Override
+        public String toString() {
+            return "CubeSegmentStatistics [cubeName=" + cubeName + 
",segmentName=" + segmentName + ",sourceCuboidId="
+                    + sourceCuboidId + ",targetCuboidId=" + targetCuboidId + 
",filterMask=" + filterMask + "]";
+        }
     }
 
     public static class CubeSegmentStatisticsResult implements Serializable {
+        protected static final long serialVersionUID = 1L;
+
         private final String queryType;
         private final Map<String, Map<String, CubeSegmentStatistics>> 
cubeSegmentStatisticsMap;
         private String realization;
@@ -372,12 +389,26 @@ public class QueryContext {
         public Map<String, Map<String, CubeSegmentStatistics>> 
getCubeSegmentStatisticsMap() {
             return cubeSegmentStatisticsMap;
         }
+
+        @Override
+        public String toString() {
+            return "CubeSegmentStatisticsResult [queryType=" + queryType + 
",realization=" + realization
+                    + ",realizationType=" + realizationType + 
",cubeSegmentStatisticsMap=" + cubeSegmentStatisticsMap
+                    + "]";
+        }
     }
 
     public static class QueryStatisticsResult implements Serializable {
+        protected static final long serialVersionUID = 1L;
+
         private final List<RPCStatistics> rpcStatisticsList;
         private final List<CubeSegmentStatisticsResult> 
cubeSegmentStatisticsResultList;
 
+        public QueryStatisticsResult() {
+            rpcStatisticsList = Lists.newArrayList();
+            cubeSegmentStatisticsResultList = Lists.newArrayList();
+        }
+
         public QueryStatisticsResult(List<RPCStatistics> rpcStatisticsList,
                 List<CubeSegmentStatisticsResult> 
cubeSegmentStatisticsResultList) {
             this.rpcStatisticsList = rpcStatisticsList;
@@ -391,5 +422,11 @@ public class QueryContext {
         public List<CubeSegmentStatisticsResult> 
getCubeSegmentStatisticsResultList() {
             return cubeSegmentStatisticsResultList;
         }
+
+        @Override
+        public String toString() {
+            return "QueryStatisticsResult [rpcStatisticsList=" + 
rpcStatisticsList + ",cubeSegmentStatisticsResultList"
+                    + cubeSegmentStatisticsResultList + "]";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java 
b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
index 07fcc49..904c4bd 100644
--- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
+++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
@@ -30,6 +30,9 @@ public class JobMetricsFacade {
     private static final Logger logger = 
LoggerFactory.getLogger(JobMetricsFacade.class);
 
     public static void updateMetrics(JobStatisticsResult jobStats) {
+        if 
(!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForJobEnabled()) {
+            return;
+        }
         /**
          * report job related metrics
          */

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
index 2616c38..ce28bf6 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
@@ -62,11 +62,30 @@ public class MetricsManager {
         return instance;
     }
 
-    public static void setSystemCubeSink(Sink systemCubeSink) {
+    public static void initMetricsManager(Sink systemCubeSink,
+            Map<ActiveReservoir, List<Pair<String, Properties>>> 
sourceReporterBindProperties) {
+        setSystemCubeSink(systemCubeSink);
+        setSourceReporterBindProps(sourceReporterBindProperties);
+        instance.init();
+    }
+
+    private static void setSystemCubeSink(Sink systemCubeSink) {
+        if (systemCubeSink == null) {
+            logger.warn("SystemCubeSink is not set and the default one will be 
chosen");
+            try {
+                Class clz = 
Class.forName(KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass());
+                systemCubeSink = (Sink) clz.getConstructor().newInstance();
+            } catch (Exception e) {
+                logger.warn("Failed to initialize the "
+                        + 
KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass()
+                        + ". The StubSink will be used");
+                systemCubeSink = new StubSink();
+            }
+        }
         scSink = systemCubeSink;
     }
 
-    public static void setSourceReporterBindProps(
+    private static void setSourceReporterBindProps(
             Map<ActiveReservoir, List<Pair<String, Properties>>> 
sourceReporterBindProperties) {
         sourceReporterBindProps = 
Maps.newHashMapWithExpectedSize(sourceReporterBindProperties.size());
         for (ActiveReservoir activeReservoir : 
sourceReporterBindProperties.keySet()) {
@@ -88,20 +107,7 @@ public class MetricsManager {
         }
     }
 
-    public void init() {
-        if (scSink == null) {
-            logger.warn("SystemCubeSink is not set and the default one will be 
chosen");
-            try {
-                Class clz = 
Class.forName(KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass());
-                scSink = (Sink) clz.getConstructor().newInstance();
-            } catch (Exception e) {
-                logger.warn(
-                        "Failed to initialize the " + 
KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass()
-                                + ". The StubSink will be used");
-                scSink = new StubSink();
-            }
-        }
-
+    private void init() {
         if (KylinConfig.getInstanceFromEnv().isKylinMetricsMonitorEnabled()) {
             logger.info("Kylin metrics monitor is enabled.");
             int nameIdx = 0;
@@ -136,7 +142,7 @@ public class MetricsManager {
         }
     }
 
-    public String getSystemTableFromSubject(String subject) {
+    public static String getSystemTableFromSubject(String subject) {
         return scSink.getTableFromSubject(subject);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 3a5c664..4e59f10 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -68,6 +68,11 @@ public class QueryMetricsFacade {
     }
 
     public static void updateMetrics(SQLRequest sqlRequest, SQLResponse 
sqlResponse) {
+        updateMetricsToLocal(sqlRequest, sqlResponse);
+        updateMetricsToReservoir(sqlRequest, sqlResponse);
+    }
+
+    private static void updateMetricsToLocal(SQLRequest sqlRequest, 
SQLResponse sqlResponse) {
         if (!enabled)
             return;
 
@@ -80,10 +85,15 @@ public class QueryMetricsFacade {
 
         String cubeMetricName = projectName + ",sub=" + cubeName;
         update(getQueryMetrics(cubeMetricName), sqlResponse);
+    }
 
-        /**
-         * report query related metrics
-         */
+    /**
+     * report query related metrics
+     */
+    private static void updateMetricsToReservoir(SQLRequest sqlRequest, 
SQLResponse sqlResponse) {
+        if 
(!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
+            return;
+        }
         String user = 
SecurityContextHolder.getContext().getAuthentication().getName();
         if (user == null) {
             user = "unknown";

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java 
b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index bed4764..bca52bc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -206,10 +206,18 @@ public class SQLResponse implements Serializable {
 
     @JsonIgnore
     public QueryContext.QueryStatisticsResult getQueryStatistics() {
-        return (QueryContext.QueryStatisticsResult) 
SerializationUtils.deserialize(queryStatistics);
+        if (queryStatistics != null) {
+            try {
+                return (QueryContext.QueryStatisticsResult) 
SerializationUtils.deserialize(queryStatistics);
+            } catch (Exception e) { // Exception may happen due to
+                System.out.println("Error while deserialize queryStatistics 
due to " + e);
+            }
+        }
+        return new QueryContext.QueryStatisticsResult();
     }
 
     public void setQueryStatistics(QueryContext.QueryStatisticsResult 
queryStatisticsResult) {
-        this.queryStatistics = 
SerializationUtils.serialize(queryStatisticsResult);
+        this.queryStatistics = queryStatisticsResult == null ? null
+                : SerializationUtils.serialize(queryStatisticsResult);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index a01997e..d59df2e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -425,6 +425,7 @@ public class QueryService extends BasicService {
                 sqlResponse.setThrowable(e.getCause() == null ? e : 
ExceptionUtils.getRootCause(e));
                 sqlResponse.setTotalScanCount(queryContext.getScannedRows());
                 sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
+                
sqlResponse.setQueryStatistics(queryContext.getQueryStatisticsResult());
 
                 if (queryCacheEnabled && e.getCause() != null
                         && ExceptionUtils.getRootCause(e) instanceof 
ResourceLimitExceededException) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server/src/main/resources/kylinMetrics.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinMetrics.xml 
b/server/src/main/resources/kylinMetrics.xml
index 92e391f..354b1a0 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -26,7 +26,7 @@
             <value>10</value>
         </constructor-arg>
         <constructor-arg index="1">
-            <value>100</value>
+            <value>10</value>
         </constructor-arg>
         <constructor-arg index="2">
             <value>10</value>
@@ -37,48 +37,43 @@
 
     <bean id="kafkaSink" 
class="org.apache.kylin.metrics.lib.impl.kafka.KafkaSink"/>
 
-    <bean id="systemCubeSink" 
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+    <bean id="initMetricsManager" 
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
         <property name="targetClass" 
value="org.apache.kylin.metrics.MetricsManager"/>
-        <property name="targetMethod" value="setSystemCubeSink"/>
+        <property name="targetMethod" value="initMetricsManager"/>
         <property name="arguments">
             <list>
                 <ref bean="hiveSink"/>
+                <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" 
value-type="java.util.List">
+                    <!--
+                    <entry key-ref="instantReservoir">
+                        <list>
+                            <bean class="org.apache.kylin.common.util.Pair">
+                                <property name="first"
+                                          
value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/>
+                                <property name="second">
+                                    <props>
+                                        <prop 
key="bootstrap.servers">sandbox:9092</prop>
+                                    </props>
+                                </property>
+                            </bean>
+                        </list>
+                    </entry>
+                    -->
+                    <entry key-ref="blockingReservoir">
+                        <list>
+                            <bean class="org.apache.kylin.common.util.Pair">
+                                <property name="first"
+                                          
value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/>
+                                <property name="second">
+                                    <props>
+                                    </props>
+                                </property>
+                            </bean>
+                        </list>
+                    </entry>
+                </map>
             </list>
         </property>
     </bean>
 
-    <bean id="sourceReporterBindProperties" 
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
-        <property name="targetClass" 
value="org.apache.kylin.metrics.MetricsManager"/>
-        <property name="targetMethod" value="setSourceReporterBindProps"/>
-        <property name="arguments">
-            <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" 
value-type="java.util.List">
-                <entry key-ref="instantReservoir">
-                    <list>
-                        <bean class="org.apache.kylin.common.util.Pair">
-                            <property name="first"
-                                      
value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/>
-                            <property name="second">
-                                <props>
-                                    <prop 
key="bootstrap.servers">sandbox:9092</prop>
-                                </props>
-                            </property>
-                        </bean>
-                    </list>
-                </entry>
-                <entry key-ref="blockingReservoir">
-                    <list>
-                        <bean class="org.apache.kylin.common.util.Pair">
-                            <property name="first"
-                                      
value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/>
-                            <property name="second">
-                                <props>
-                                </props>
-                            </property>
-                        </bean>
-                    </list>
-                </entry>
-            </map>
-        </property>
-    </bean>
-
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/4ef79623/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java 
b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index bd1da59..c16c350 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.ServiceTestBase;
@@ -111,4 +112,45 @@ public class QueryMetricsTest extends ServiceTestBase {
         System.clearProperty("kylin.server.query-metrics-enabled");
     }
 
+    @Test
+    public void testQueryStatisticsResult() throws Exception {
+        System.setProperty("kylin.core.metrics.reporter-query-enabled", 
"true");
+        QueryMetricsFacade.init();
+
+        SQLRequest sqlRequest = new SQLRequest();
+        sqlRequest.setSql("select * from TEST_KYLIN_FACT");
+        sqlRequest.setProject("default");
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setDuration(10);
+        sqlResponse.setCube("test_cube");
+        sqlResponse.setIsException(false);
+        sqlResponse.setTotalScanCount(100);
+        List<String> list1 = new ArrayList<>();
+        list1.add("111");
+        list1.add("112");
+        List<String> list2 = new ArrayList<>();
+        list2.add("111");
+        list2.add("112");
+        List<List<String>> results = new ArrayList<>();
+        results.add(list1);
+        results.add(list2);
+        sqlResponse.setResults(results);
+        sqlResponse.setStorageCacheUsed(true);
+
+        QueryContext context = QueryContext.current();
+        int ctxId = 0;
+        context.addContext(ctxId, "OLAP", true);
+        context.addRPCStatistics(ctxId, "sandbox", "test_cube", 
"20100101000000_20150101000000", 3L, 3L, 3L, null, 80L,
+                0L, 2L, 2L, 0L, 30L);
+
+        sqlResponse.setQueryStatistics(context.getQueryStatisticsResult());
+
+        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+
+        Thread.sleep(2000);
+
+        System.clearProperty("kylin.server.query-metrics-enabled");
+        System.out.println("------------testQueryStatisticsResult 
done------------");
+    }
 }

Reply via email to