This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eb70b5d52b7a149c8e6cee9cd0a7fdaf881504d1
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Mon May 25 19:23:19 2020 +0800

    KYLIN-4513 Introduce query call trace
---
 core-common/pom.xml                                |  10 ++
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 ++
 .../java/org/apache/kylin/common/QueryContext.java |  76 +++++++++++++-
 .../kylin/common/tracer/JaegerTracerWrapper.java   |  48 +++++++++
 .../kylin/common/tracer/TracerConstants.java       | 111 +++++++++++++++++++++
 .../apache/kylin/common/tracer/TracerManager.java  |  59 +++++++++++
 .../apache/kylin/common/tracer/TracerWrapper.java  |  58 +++++++++++
 .../kylin/common/tracer/TracerManagerTest.java     |  46 +++++++++
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |  17 +++-
 pom.xml                                            |   6 ++
 .../query/relnode/OLAPToEnumerableConverter.java   |   9 +-
 .../apache/kylin/rest/service/QueryService.java    |  29 +++++-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        |  56 +++++++++--
 .../stream/rpc/HttpStreamDataSearchClient.java     |  19 ++--
 14 files changed, 526 insertions(+), 25 deletions(-)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 0549898..ca5cefa 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -63,6 +63,16 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-compress</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.jaegertracing</groupId>
+            <artifactId>jaeger-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.mockito</groupId>
+                    <artifactId>mockito-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <!-- Provided -->
         <dependency>
             <groupId>com.google.code.findbugs</groupId>
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index a5af5e3..f2d423d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2670,4 +2670,11 @@ public abstract class KylinConfigBase implements 
Serializable {
     public int getDefaultTimeFilter() {
         return Integer.parseInt(getOptional("kylin.web.default-time-filter", 
"2"));
     }
+
+    // 
============================================================================
+    // Trace
+    // 
============================================================================
+    public String getTracerCollectorEndpoint() {
+        return getOptional("kylin.trace.collector-endpoint", "");
+    }
 }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 0f6534f..df2fa4d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -29,12 +29,18 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.tracer.TracerConstants.OperationEum;
+import org.apache.kylin.common.tracer.TracerConstants.TagEnum;
+import org.apache.kylin.common.tracer.TracerManager;
+import org.apache.kylin.common.tracer.TracerWrapper;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.opentracing.Span;
+
 /**
  * Holds per query information and statistics.
  */
@@ -48,14 +54,19 @@ public class QueryContext {
     public interface QueryStopListener {
         void stop(QueryContext query);
     }
-    
+
+    private static final String SPAN_QUERY_SERVER = 
KylinConfig.getInstanceFromEnv().getDeployEnv() + "-Query";
+
     private final String queryId;
     private final String project;
     private final String sql;
     private final String username;
     private final int maxConnThreads;
     private final long queryStartMillis;
-    
+
+    private final TracerWrapper tracer;
+    private final Span rootSpan;
+
     private Set<String> groups;
     private AtomicLong scannedRows = new AtomicLong();
     private AtomicLong returnedRows = new AtomicLong();
@@ -84,6 +95,13 @@ public class QueryContext {
         this.username = user;
         this.maxConnThreads = maxConnThreads;
         this.queryStartMillis = startMills;
+
+        this.tracer = TracerManager.getTracerWrapper(SPAN_QUERY_SERVER);
+        this.rootSpan = tracer.startSpan(OperationEum.MAIN, null);
+        rootSpan.setTag(TagEnum.PROJECT.toString(), projectName);
+        rootSpan.setTag(TagEnum.SQL.toString(), sql);
+        rootSpan.setTag(TagEnum.QUERY_ID.toString(), queryId);
+        rootSpan.setTag(TagEnum.SUBMITTER.toString(), user);
     }
 
     public ExecutorService getConnectionPool(ExecutorService sharedConnPool) {
@@ -210,6 +228,60 @@ public class QueryContext {
         }
     }
 
+    public Span activeSpan() {
+        return tracer.activeSpan();
+    }
+
+    public Span startFetchCache() {
+        return tracer.startSpan(OperationEum.FETCH_CACHE_STEP, rootSpan);
+    }
+
+    public Span startSqlParse() {
+        return tracer.startSpan(OperationEum.SQL_PARSE_STEP, rootSpan);
+    }
+
+    public Span startQueryPlan() {
+        return tracer.startSpan(OperationEum.QUERY_PLAN_STEP, rootSpan);
+    }
+
+    public Span startFetchSegmentCache(String cubeName, String segmentName) {
+        Span span = tracer.startSpan(OperationEum.FETCH_SEGMENT_CACHE_STEP, 
rootSpan);
+        span.setTag(TagEnum.CUBE.toString(), cubeName);
+        span.setTag(TagEnum.SEGMENT.toString(), segmentName);
+        return span;
+    }
+
+    public Span startEPRangeQuerySpan(String range, String cubeName, String 
segmentName, String table, long sourceId,
+            long targetId, String fuzzyKeySizeStr) {
+        Span span = tracer.startSpan(OperationEum.ENDPOINT_RANGE_REQUEST, 
rootSpan);
+        span.setTag(TagEnum.EPRANGE.toString(), range);
+        span.setTag(TagEnum.CUBE.toString(), cubeName);
+        span.setTag(TagEnum.SEGMENT.toString(), segmentName);
+        span.setTag(TagEnum.HTABLE.toString(), table);
+
+        StringBuilder cuboIdInfo = new StringBuilder();
+        cuboIdInfo.append("sourceId[" + sourceId).append("]");
+        cuboIdInfo.append("targetId[" + targetId).append("]");
+        span.setTag(TagEnum.CUBOID.toString(), cuboIdInfo.toString());
+
+        // fuzzyKeySizeStr for checking whether the fuzzy key is too large
+        span.setTag(TagEnum.FUZZY_KEY_SIZE.toString(), fuzzyKeySizeStr);
+        return span;
+    }
+
+    public Span startRegionRPCSpan(String regionServerName, Span 
epRangeQuerySpan) {
+        Span span = tracer.startSpan(OperationEum.REGION_SERVER_RPC, 
epRangeQuerySpan);
+        span.setTag(TagEnum.REGION_SERVER.toString(), regionServerName);
+        return span;
+    }
+
+    public Span startStreamingReceiverQuerySpan(String cubeName, int rsID) {
+        Span span = 
tracer.startSpan(OperationEum.STREAMING_RECEIVER_REQUEST.toString(), rootSpan);
+        span.setTag(TagEnum.CUBE.toString(), cubeName);
+        span.setTag(TagEnum.REPLICA_SET.toString(), String.valueOf(rsID));
+        return span;
+    }
+
     public Throwable getThrowable() {
         return throwable.get();
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/tracer/JaegerTracerWrapper.java
 
b/core-common/src/main/java/org/apache/kylin/common/tracer/JaegerTracerWrapper.java
new file mode 100644
index 0000000..0d2e9bf
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/tracer/JaegerTracerWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.tracer;
+
+import io.jaegertracing.internal.JaegerSpan;
+import io.jaegertracing.internal.JaegerTracer;
+import io.opentracing.Span;
+
+public class JaegerTracerWrapper extends TracerWrapper {
+
+    JaegerTracerWrapper(JaegerTracer tracer) {
+        super(tracer);
+    }
+
+    @Override
+    public Object getTagValue(Span span, String tagName) {
+        return ((JaegerSpan) span).getTags().get(tagName);
+    }
+
+    // by Millisecond
+    @Override
+    public long getStart(Span span) {
+        return ((JaegerSpan) span).getStart() / 1000;
+    }
+
+    // by Millisecond
+    @Override
+    public long getDuration(Span span) {
+        return ((JaegerSpan) span).getDuration() / 1000;
+    }
+
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/tracer/TracerConstants.java 
b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerConstants.java
new file mode 100644
index 0000000..b3234b9
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerConstants.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.tracer;
+
+import java.util.Locale;
+
+import org.apache.kylin.common.util.StringUtil;
+
+public class TracerConstants {
+
+    public enum OperationEum {
+        MAIN("main_query"), //
+        FETCH_CACHE_STEP("fetch_cache"), //
+        SQL_PARSE_STEP("parse_sql"), //
+        QUERY_PLAN_STEP("query_plan"), //
+        FETCH_SEGMENT_CACHE_STEP("fetch_segment_cache"), //
+        ENDPOINT_RANGE_REQUEST("endpoint_range_request"), //
+        REGION_SERVER_RPC("region_server_rpc"), //
+        STREAMING_RECEIVER_REQUEST("streaming_receiver_request") //
+        ;
+
+        private final String name;
+
+        OperationEum(String name) {
+            this.name = name;
+        }
+
+        public static OperationEum getByName(String name) {
+            if (StringUtil.isEmpty(name)) {
+                return null;
+            }
+            for (OperationEum property : OperationEum.values()) {
+                if (property.name.equals(name.toUpperCase(Locale.ROOT))) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    public enum TagEnum {
+        PROJECT("project"), //
+        SQL("sql"), //
+        SUBMITTER("submitter"), //
+        QUERY_ID("query_id"), //
+        CUBE("cube"), //
+        SEGMENT("segment"), //
+        HTABLE("htable"), //
+        EPRANGE("ep_range"), //
+        CUBOID("cuboid"), //
+        FUZZY_KEY_SIZE("fuzzykey_size"), //
+        REGION_SERVER("region_server"), //
+        RPC_DURATION("rpc_duration"), //
+        RPC_SCAN_COUNT("rpc_scan_count"), //
+        RPC_FILTER_COUNT("rpc_filter_count"), //
+        RPC_AGG_COUNT("rpc_agg_count"), //
+        RPC_SERIALIZED_BYTES("rpc_serialized_bytes"), //
+        RPC_SYSTEMLOAD("rpc_sysload"), //
+        RPC_FREE_MEM("rpc_free_physical_mem"), //
+        RPC_FREE_SWAP("rpc_free_swap_size"), //
+        RPC_ETC_MSG("rpc_etc_message"), //
+        REPLICA_SET("replica_set")//
+        ;
+
+        private final String name;
+
+        TagEnum(String name) {
+            this.name = name;
+        }
+
+        public static TagEnum getByName(String name) {
+            if (StringUtil.isEmpty(name)) {
+                return null;
+            }
+            for (TagEnum property : TagEnum.values()) {
+                if (property.name.equals(name.toUpperCase(Locale.ROOT))) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/tracer/TracerManager.java 
b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerManager.java
new file mode 100644
index 0000000..57379f6
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.tracer;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+
+import io.jaegertracing.Configuration;
+import io.jaegertracing.internal.samplers.ConstSampler;
+
+public class TracerManager {
+
+    private static final ConcurrentMap<String, TracerWrapper> CACHE = 
Maps.newConcurrentMap();
+
+    public static TracerWrapper getTracerWrapper(String serviceName) {
+        String endpoint = 
KylinConfig.getInstanceFromEnv().getTracerCollectorEndpoint();
+
+        TracerWrapper tracerWrapper = CACHE.get(serviceName);
+        if (tracerWrapper != null) {
+            return tracerWrapper;
+        }
+
+        synchronized (TracerManager.class) {
+            tracerWrapper = CACHE.get(serviceName);
+            if (tracerWrapper != null) {
+                return tracerWrapper;
+            }
+            Configuration.SamplerConfiguration samplerConfig = 
Configuration.SamplerConfiguration.fromEnv()
+                    .withType(ConstSampler.TYPE).withParam(1);
+            Configuration.SenderConfiguration sender = 
Configuration.SenderConfiguration.fromEnv()
+                    .withEndpoint(endpoint);
+            Configuration.ReporterConfiguration reporterConfig = 
Configuration.ReporterConfiguration.fromEnv()
+                    .withLogSpans(false).withSender(sender);
+            Configuration config = new 
Configuration(serviceName).withSampler(samplerConfig)
+                    .withReporter(reporterConfig);
+            tracerWrapper = new JaegerTracerWrapper(config.getTracer());
+            CACHE.put(serviceName, tracerWrapper);
+            return tracerWrapper;
+        }
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/tracer/TracerWrapper.java 
b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerWrapper.java
new file mode 100644
index 0000000..dbcde5c
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerWrapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.tracer;
+
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.Tracer;
+
+public abstract class TracerWrapper {
+
+    private final Tracer tracer;
+
+    TracerWrapper(Tracer tracer) {
+        this.tracer = tracer;
+    }
+
+    public Span activeSpan() {
+        return tracer.activeSpan();
+    }
+
+    public Span startSpan(TracerConstants.OperationEum operation, Span 
parentSpan) {
+        return startSpan(operation.toString(), parentSpan);
+    }
+
+    public Span startSpan(String operationName, Span parentSpan) {
+        Scope scope;
+        if (parentSpan != null) {
+            scope = 
tracer.buildSpan(operationName).ignoreActiveSpan().asChildOf(parentSpan).startActive(false);
+        } else {
+            scope = 
tracer.buildSpan(operationName).ignoreActiveSpan().startActive(false);
+        }
+        return scope.span();
+    }
+
+    public abstract Object getTagValue(Span span, String tagName);
+
+    // by Millisecond
+    public abstract long getStart(Span span);
+
+    // by Millisecond
+    public abstract long getDuration(Span span);
+}
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/tracer/TracerManagerTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/tracer/TracerManagerTest.java
new file mode 100644
index 0000000..61ba30c
--- /dev/null
+++ 
b/core-common/src/test/java/org/apache/kylin/common/tracer/TracerManagerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.tracer;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TracerManagerTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetJaegerTracerWrapper() {
+        TracerWrapper tracerWrapper = TracerManager.getTracerWrapper("Query");
+        Assert.assertTrue(tracerWrapper instanceof JaegerTracerWrapper);
+
+        Assert.assertSame(tracerWrapper, 
TracerManager.getTracerWrapper("Query"));
+    }
+}
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 804ce3f..65d5914 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -62,15 +62,16 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.translate.DerivedFilterTranslator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
+import io.opentracing.Span;
 
 public abstract class GTCubeStorageQueryBase implements IStorageQuery {
 
@@ -86,6 +87,15 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
 
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, 
TupleInfo returnTupleInfo) {
+        Span queryPlanSpan = QueryContextFacade.current().startQueryPlan();
+        try {
+            return searchInner(context, sqlDigest, returnTupleInfo);
+        } finally {
+            queryPlanSpan.finish();
+        }
+    }
+
+    public ITupleIterator searchInner(StorageContext context, SQLDigest 
sqlDigest, TupleInfo returnTupleInfo) {
         GTCubeStorageQueryRequest request = getStorageQueryRequest(context, 
sqlDigest, returnTupleInfo);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
@@ -101,7 +111,6 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
 
         if (scanners.isEmpty())
             return ITupleIterator.EMPTY_TUPLE_ITERATOR;
-
         return new SequentialCubeTupleIterator(scanners, request.getCuboid(), 
request.getDimensions(),
                 request.getDynGroups(), request.getGroups(), 
request.getMetrics(), returnTupleInfo,
                 request.getContext(), sqlDigest);
diff --git a/pom.xml b/pom.xml
index b44ea75..916ad18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,7 @@
     <t-digest.version>3.1</t-digest.version>
     <freemarker.version>2.3.23</freemarker.version>
     <rocksdb.version>5.9.2</rocksdb.version>
+    <jaeger.version>0.30.2</jaeger.version>
     <lz4.version>1.3.0</lz4.version>
     <mssql-jdbc.version>6.2.2.jre8</mssql-jdbc.version>
     <!--metric-->
@@ -829,6 +830,11 @@
         <artifactId>rocksdbjni</artifactId>
         <version>${rocksdb.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.jaegertracing</groupId>
+        <artifactId>jaeger-client</artifactId>
+        <version>${jaeger.version}</version>
+      </dependency>
 
       <!-- Logging -->
       <dependency>
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
index e3b2272..846da3b 100644
--- 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
+++ 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
@@ -21,6 +21,7 @@ package org.apache.kylin.query.relnode;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import io.opentracing.Span;
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -118,7 +119,13 @@ public class OLAPToEnumerableConverter extends 
ConverterImpl implements Enumerab
             
QueryContextFacade.current().setCalcitePlan(this.copy(getTraitSet(), 
getInputs()));
         }
 
-        return impl.visitChild(this, 0, inputAsEnum, pref);
+        Result result = impl.visitChild(this, 0, inputAsEnum, pref);
+        //Finish sql parse span
+        Span sqlParseSpan = QueryContextFacade.current().activeSpan();
+        if (sqlParseSpan != null) {
+            sqlParseSpan.finish();
+        }
+        return result;
     }
 
      protected List<OLAPContext> listContextsHavingScan() {
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 8a00066..e731852 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -136,6 +136,8 @@ import org.springframework.stereotype.Component;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import io.opentracing.Span;
+
 /**
  * @author xduo
  */
@@ -464,7 +466,7 @@ public class QueryService extends BasicService {
                 }
 
                 if (sqlResponse == null && isQueryCacheEnabled) {
-                    sqlResponse = searchQueryInCache(sqlRequest);
+                    sqlResponse = searchQueryInCacheWithSpan(sqlRequest);
                 }
 
                 // real execution if required
@@ -608,6 +610,15 @@ public class QueryService extends BasicService {
         return username;
     }
 
+    public SQLResponse searchQueryInCacheWithSpan(SQLRequest sqlRequest) {
+        Span cacheSpan = QueryContextFacade.current().startFetchCache();
+        try {
+            return searchQueryInCache(sqlRequest);
+        } finally {
+            cacheSpan.finish();
+        }
+    }
+
     public SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
         Cache cache = cacheManager.getCache(QUERY_CACHE);
         Cache.ValueWrapper wrapper = cache.get(sqlRequest.getCacheKey());
@@ -701,7 +712,7 @@ public class QueryService extends BasicService {
                         columnMetas);
             }
             if (!isPrepareRequest) {
-                return executeRequest(correctedSql, sqlRequest, conn);
+                return executeRequestWithSpan(correctedSql, sqlRequest, conn);
             } else {
                 long prjLastModifyTime = 
getProjectManager().getProject(sqlRequest.getProject()).getLastModified();
                 preparedContextKey = new 
PreparedContextKey(sqlRequest.getProject(), prjLastModifyTime, correctedSql);
@@ -721,7 +732,7 @@ public class QueryService extends BasicService {
                 } else {
                     preparedContext = 
createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
                 }
-                return executePrepareRequest(correctedSql, prepareSqlRequest, 
preparedContext);
+                return executePrepareRequestWithSpan(correctedSql, 
prepareSqlRequest, preparedContext);
             }
 
         } finally {
@@ -990,6 +1001,12 @@ public class QueryService extends BasicService {
         }
     }
 
+    private SQLResponse executeRequestWithSpan(String correctedSql, SQLRequest 
sqlRequest, Connection conn)
+            throws Exception {
+        QueryContextFacade.current().startSqlParse();
+        return executeRequest(correctedSql, sqlRequest, conn);
+    }
+
     /**
      * @param correctedSql
      * @param sqlRequest
@@ -1022,6 +1039,12 @@ public class QueryService extends BasicService {
         return buildSqlResponse(sqlRequest.getProject(), isPushDown, 
r.getFirst(), r.getSecond());
     }
 
+    private SQLResponse executePrepareRequestWithSpan(String correctedSql, 
PrepareSqlRequest sqlRequest,
+            PreparedContext preparedContext) throws Exception {
+        QueryContextFacade.current().startSqlParse();
+        return executePrepareRequest(correctedSql, sqlRequest, 
preparedContext);
+    }
+    
     private SQLResponse executePrepareRequest(String correctedSql, 
PrepareSqlRequest sqlRequest,
             PreparedContext preparedContext) throws Exception {
         ResultSet resultSet = null;
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 8cc4daa..04fd26c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.SubThreadPoolExecutor;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.common.tracer.TracerConstants.TagEnum;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
@@ -62,6 +63,10 @@ import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
+import org.apache.kylin.shaded.com.google.common.cache.CacheLoader;
+import org.apache.kylin.shaded.com.google.common.cache.LoadingCache;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
@@ -75,13 +80,11 @@ import org.apache.kylin.storage.hbase.util.HBaseUnionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
-import org.apache.kylin.shaded.com.google.common.cache.CacheLoader;
-import org.apache.kylin.shaded.com.google.common.cache.LoadingCache;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.HBaseZeroCopyByteString;
 
+import io.opentracing.Span;
+
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CubeHBaseEndpointRPC.class);
@@ -204,9 +207,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         logger.info(
                 "The scan {} for segment {} is as below with {} separate raw 
scans, shard part of start/end key is set to 0",
                 Integer.toHexString(System.identityHashCode(scanRequest)), 
cubeSeg, rawScans.size());
+        StringBuilder fuzzyKeySizeInfo = new StringBuilder();
         for (RawScan rs : rawScans) {
+            
fuzzyKeySizeInfo.append("RawScanFuzzyKey:").append(rs.fuzzyKeys.size()).append(";");
             logScan(rs, cubeSeg.getStorageLocationIdentifier());
         }
+        final String fuzzyKeySizeStr = fuzzyKeySizeInfo.toString();
 
         logger.debug("Submitting rpc to {} shards starting from shard {}, scan 
range count {}", shardNum,
                 cuboidBaseShard, rawScans.size());
@@ -221,6 +227,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 cubeSeg.getConfig().getQuerySegmentCacheMaxSize() * 1024 * 
1024);
         String calculatedSegmentQueryCacheKey = null;
         if (querySegmentCacheEnabled) {
+            Span segmentCacheSpan = 
queryContext.startFetchSegmentCache(cubeSeg.getCubeInstance().getName(),
+                    cubeSeg.getName());
             try {
                 logger.info("Query-{}: try to get segment result from cache 
for segment:{}", queryContext.getQueryId(),
                         cubeSeg);
@@ -251,6 +259,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 }
             } catch (Exception e) {
                 logger.error("Fail to handle cached segment result from 
cache", e);
+            } finally {
+                segmentCacheSpan.finish();
             }
         }
         final String segmentQueryCacheKey = calculatedSegmentQueryCacheKey;
@@ -279,8 +289,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 @Override
                 public void run() {
                     runEPRange(queryContext, logHeader, compressionResult, 
builder.build(), epRange.getFirst(),
-                            epRange.getSecond(), epResultItr, 
querySegmentCacheEnabled, segmentQueryResultBuilder,
-                            segmentQueryCacheKey);
+                            epRange.getSecond(), epResultItr, fuzzyKeySizeStr, 
querySegmentCacheEnabled,
+                            segmentQueryResultBuilder, segmentQueryCacheKey);
                 }
             });
         }
@@ -290,8 +300,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private void runEPRange(final QueryContext queryContext, final String 
logHeader, final boolean compressionResult,
             final CubeVisitProtos.CubeVisitRequest request, byte[] startKey, 
byte[] endKey,
-            final ExpectedSizeIterator epResultItr, final boolean 
querySegmentCacheEnabled,
+            final ExpectedSizeIterator epResultItr, String fuzzyKeySizeStr, 
final boolean querySegmentCacheEnabled,
             final SegmentQueryResult.Builder segmentQueryResultBuilder, final 
String segmentQueryCacheKey) {
+        String range = BytesUtil.toHex(startKey) + "-" + 
BytesUtil.toHex(endKey);
+        final Span epRangeSpan = queryContext.startEPRangeQuerySpan(range, 
cubeSeg.getCubeInstance().getName(),
+                cubeSeg.getName(), cubeSeg.getStorageLocationIdentifier(), 
cuboid.getInputID(), cuboid.getId(),
+                fuzzyKeySizeStr);
 
         final String queryId = queryContext.getQueryId();
 
@@ -333,9 +347,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
                             ServerRpcController controller = new 
ServerRpcController();
                             BlockingRpcCallback<CubeVisitResponse> rpcCallback 
= new BlockingRpcCallback<>();
+
+                            final Span regionRPCSpan = 
queryContext.startRegionRPCSpan(regionServerName, epRangeSpan);
+                            CubeVisitResponse response = null;
                             try {
                                 rowsService.visitCube(controller, request, 
rpcCallback);
-                                CubeVisitResponse response = rpcCallback.get();
+                                response = rpcCallback.get();
                                 if (controller.failedOnException()) {
                                     throw controller.getFailedOn();
                                 }
@@ -343,6 +360,27 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                             } catch (Exception e) {
                                 throw e;
                             } finally {
+                                if (response != null) {
+                                    Stats stats = response.getStats();
+                                    
regionRPCSpan.setTag(TagEnum.RPC_DURATION.toString(),
+                                            
String.valueOf(stats.getServiceEndTime() - stats.getServiceStartTime()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_SCAN_COUNT.toString(),
+                                            
String.valueOf(stats.getScannedRowCount()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_FILTER_COUNT.toString(),
+                                            
String.valueOf(stats.getFilteredRowCount()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_AGG_COUNT.toString(),
+                                            
String.valueOf(stats.getAggregatedRowCount()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_SERIALIZED_BYTES.toString(),
+                                            
String.valueOf(stats.getSerializedSize()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_SYSTEMLOAD.toString(),
+                                            
String.valueOf(stats.getSystemCpuLoad()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_FREE_MEM.toString(),
+                                            
String.valueOf(stats.getFreePhysicalMemorySize()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_FREE_SWAP.toString(),
+                                            
String.valueOf(stats.getFreeSwapSpaceSize()));
+                                    
regionRPCSpan.setTag(TagEnum.RPC_ETC_MSG.toString(), stats.getEtcMsg());
+                                }
+                                regionRPCSpan.finish();
                                 // Reset the interrupted state
                                 Thread.interrupted();
                             }
@@ -461,6 +499,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         } catch (Throwable ex) {
             queryContext.stop(ex);
+        } finally {
+            epRangeSpan.finish();
         }
 
         if (queryContext.isStopped()) {
diff --git 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index d1c0bd5..6d529be 100644
--- 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.storage.stream.rpc;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
@@ -47,11 +49,15 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.shaded.com.google.common.base.Stopwatch;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
-import org.apache.kylin.stream.core.model.ReplicaSet;
 import org.apache.kylin.stream.core.model.DataRequest;
 import org.apache.kylin.stream.core.model.DataResponse;
 import org.apache.kylin.stream.core.model.Node;
+import org.apache.kylin.stream.core.model.ReplicaSet;
 import org.apache.kylin.stream.core.query.ResponseResultSchema;
 import org.apache.kylin.stream.core.query.StreamingTupleConverter;
 import org.apache.kylin.stream.core.query.StreamingTupleIterator;
@@ -62,12 +68,7 @@ import org.apache.kylin.stream.core.util.RestService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.shaded.com.google.common.base.Stopwatch;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import io.opentracing.Span;
 
 /**
  * TODO use long connection rather than short connection
@@ -114,12 +115,16 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
+                    final Span streamingQuerySpan = 
query.startStreamingReceiverQuerySpan(cubeDesc.getName(),
+                            rs.getReplicaSetID());
                     try {
                         Iterator<ITuple> tuplesBlock = search(dataRequest, 
cube, tupleConverter, recordsSerializer, rs,
                                 tupleInfo);
                         result.addBlock(tuplesBlock);
                     } catch (Exception e) {
                         result.setEndpointException(e);
+                    } finally {
+                        streamingQuerySpan.finish();
                     }
                 }
             });

Reply via email to