This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit eb70b5d52b7a149c8e6cee9cd0a7fdaf881504d1 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Mon May 25 19:23:19 2020 +0800 KYLIN-4513 Introduce query call trace --- core-common/pom.xml | 10 ++ .../org/apache/kylin/common/KylinConfigBase.java | 7 ++ .../java/org/apache/kylin/common/QueryContext.java | 76 +++++++++++++- .../kylin/common/tracer/JaegerTracerWrapper.java | 48 +++++++++ .../kylin/common/tracer/TracerConstants.java | 111 +++++++++++++++++++++ .../apache/kylin/common/tracer/TracerManager.java | 59 +++++++++++ .../apache/kylin/common/tracer/TracerWrapper.java | 58 +++++++++++ .../kylin/common/tracer/TracerManagerTest.java | 46 +++++++++ .../storage/gtrecord/GTCubeStorageQueryBase.java | 17 +++- pom.xml | 6 ++ .../query/relnode/OLAPToEnumerableConverter.java | 9 +- .../apache/kylin/rest/service/QueryService.java | 29 +++++- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 56 +++++++++-- .../stream/rpc/HttpStreamDataSearchClient.java | 19 ++-- 14 files changed, 526 insertions(+), 25 deletions(-) diff --git a/core-common/pom.xml b/core-common/pom.xml index 0549898..ca5cefa 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -63,6 +63,16 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> </dependency> + <dependency> + <groupId>io.jaegertracing</groupId> + <artifactId>jaeger-client</artifactId> + <exclusions> + <exclusion> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- Provided --> <dependency> <groupId>com.google.code.findbugs</groupId> diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index a5af5e3..f2d423d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2670,4 +2670,11 @@ public abstract class KylinConfigBase implements Serializable { public int getDefaultTimeFilter() { return Integer.parseInt(getOptional("kylin.web.default-time-filter", "2")); } + + // ============================================================================ + // Trace + // ============================================================================ + public String getTracerCollectorEndpoint() { + return getOptional("kylin.trace.collector-endpoint", ""); + } } diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 0f6534f..df2fa4d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -29,12 +29,18 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.kylin.common.exceptions.KylinTimeoutException; +import org.apache.kylin.common.tracer.TracerConstants.OperationEum; +import org.apache.kylin.common.tracer.TracerConstants.TagEnum; +import org.apache.kylin.common.tracer.TracerManager; +import org.apache.kylin.common.tracer.TracerWrapper; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.opentracing.Span; + /** * Holds per query information and statistics. */ @@ -48,14 +54,19 @@ public class QueryContext { public interface QueryStopListener { void stop(QueryContext query); } - + + private static final String SPAN_QUERY_SERVER = KylinConfig.getInstanceFromEnv().getDeployEnv() + "-Query"; + private final String queryId; private final String project; private final String sql; private final String username; private final int maxConnThreads; private final long queryStartMillis; - + + private final TracerWrapper tracer; + private final Span rootSpan; + private Set<String> groups; private AtomicLong scannedRows = new AtomicLong(); private AtomicLong returnedRows = new AtomicLong(); @@ -84,6 +95,13 @@ public class QueryContext { this.username = user; this.maxConnThreads = maxConnThreads; this.queryStartMillis = startMills; + + this.tracer = TracerManager.getTracerWrapper(SPAN_QUERY_SERVER); + this.rootSpan = tracer.startSpan(OperationEum.MAIN, null); + rootSpan.setTag(TagEnum.PROJECT.toString(), projectName); + rootSpan.setTag(TagEnum.SQL.toString(), sql); + rootSpan.setTag(TagEnum.QUERY_ID.toString(), queryId); + rootSpan.setTag(TagEnum.SUBMITTER.toString(), user); } public ExecutorService getConnectionPool(ExecutorService sharedConnPool) { @@ -210,6 +228,60 @@ public class QueryContext { } } + public Span activeSpan() { + return tracer.activeSpan(); + } + + public Span startFetchCache() { + return tracer.startSpan(OperationEum.FETCH_CACHE_STEP, rootSpan); + } + + public Span startSqlParse() { + return tracer.startSpan(OperationEum.SQL_PARSE_STEP, rootSpan); + } + + public Span startQueryPlan() { + return tracer.startSpan(OperationEum.QUERY_PLAN_STEP, rootSpan); + } + + public Span startFetchSegmentCache(String cubeName, String segmentName) { + Span span = tracer.startSpan(OperationEum.FETCH_SEGMENT_CACHE_STEP, rootSpan); + span.setTag(TagEnum.CUBE.toString(), cubeName); + span.setTag(TagEnum.SEGMENT.toString(), segmentName); + return span; + } + + public Span startEPRangeQuerySpan(String range, String cubeName, String segmentName, String table, long sourceId, + long targetId, String fuzzyKeySizeStr) { + Span span = tracer.startSpan(OperationEum.ENDPOINT_RANGE_REQUEST, rootSpan); + span.setTag(TagEnum.EPRANGE.toString(), range); + span.setTag(TagEnum.CUBE.toString(), cubeName); + span.setTag(TagEnum.SEGMENT.toString(), segmentName); + span.setTag(TagEnum.HTABLE.toString(), table); + + StringBuilder cuboIdInfo = new StringBuilder(); + cuboIdInfo.append("sourceId[" + sourceId).append("]"); + cuboIdInfo.append("targetId[" + targetId).append("]"); + span.setTag(TagEnum.CUBOID.toString(), cuboIdInfo.toString()); + + // fuzzyKeySizeStr for checking whether the fuzzy key is too large + span.setTag(TagEnum.FUZZY_KEY_SIZE.toString(), fuzzyKeySizeStr); + return span; + } + + public Span startRegionRPCSpan(String regionServerName, Span epRangeQuerySpan) { + Span span = tracer.startSpan(OperationEum.REGION_SERVER_RPC, epRangeQuerySpan); + span.setTag(TagEnum.REGION_SERVER.toString(), regionServerName); + return span; + } + + public Span startStreamingReceiverQuerySpan(String cubeName, int rsID) { + Span span = tracer.startSpan(OperationEum.STREAMING_RECEIVER_REQUEST.toString(), rootSpan); + span.setTag(TagEnum.CUBE.toString(), cubeName); + span.setTag(TagEnum.REPLICA_SET.toString(), String.valueOf(rsID)); + return span; + } + public Throwable getThrowable() { return throwable.get(); } diff --git a/core-common/src/main/java/org/apache/kylin/common/tracer/JaegerTracerWrapper.java b/core-common/src/main/java/org/apache/kylin/common/tracer/JaegerTracerWrapper.java new file mode 100644 index 0000000..0d2e9bf --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/tracer/JaegerTracerWrapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.tracer; + +import io.jaegertracing.internal.JaegerSpan; +import io.jaegertracing.internal.JaegerTracer; +import io.opentracing.Span; + +public class JaegerTracerWrapper extends TracerWrapper { + + JaegerTracerWrapper(JaegerTracer tracer) { + super(tracer); + } + + @Override + public Object getTagValue(Span span, String tagName) { + return ((JaegerSpan) span).getTags().get(tagName); + } + + // by Millisecond + @Override + public long getStart(Span span) { + return ((JaegerSpan) span).getStart() / 1000; + } + + // by Millisecond + @Override + public long getDuration(Span span) { + return ((JaegerSpan) span).getDuration() / 1000; + } + +} diff --git a/core-common/src/main/java/org/apache/kylin/common/tracer/TracerConstants.java b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerConstants.java new file mode 100644 index 0000000..b3234b9 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerConstants.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.tracer; + +import java.util.Locale; + +import org.apache.kylin.common.util.StringUtil; + +public class TracerConstants { + + public enum OperationEum { + MAIN("main_query"), // + FETCH_CACHE_STEP("fetch_cache"), // + SQL_PARSE_STEP("parse_sql"), // + QUERY_PLAN_STEP("query_plan"), // + FETCH_SEGMENT_CACHE_STEP("fetch_segment_cache"), // + ENDPOINT_RANGE_REQUEST("endpoint_range_request"), // + REGION_SERVER_RPC("region_server_rpc"), // + STREAMING_RECEIVER_REQUEST("streaming_receiver_request") // + ; + + private final String name; + + OperationEum(String name) { + this.name = name; + } + + public static OperationEum getByName(String name) { + if (StringUtil.isEmpty(name)) { + return null; + } + for (OperationEum property : OperationEum.values()) { + if (property.name.equals(name.toUpperCase(Locale.ROOT))) { + return property; + } + } + + return null; + } + + @Override + public String toString() { + return name; + } + } + + public enum TagEnum { + PROJECT("project"), // + SQL("sql"), // + SUBMITTER("submitter"), // + QUERY_ID("query_id"), // + CUBE("cube"), // + SEGMENT("segment"), // + HTABLE("htable"), // + EPRANGE("ep_range"), // + CUBOID("cuboid"), // + FUZZY_KEY_SIZE("fuzzykey_size"), // + REGION_SERVER("region_server"), // + RPC_DURATION("rpc_duration"), // + RPC_SCAN_COUNT("rpc_scan_count"), // + RPC_FILTER_COUNT("rpc_filter_count"), // + RPC_AGG_COUNT("rpc_agg_count"), // + RPC_SERIALIZED_BYTES("rpc_serialized_bytes"), // + RPC_SYSTEMLOAD("rpc_sysload"), // + RPC_FREE_MEM("rpc_free_physical_mem"), // + RPC_FREE_SWAP("rpc_free_swap_size"), // + RPC_ETC_MSG("rpc_etc_message"), // + REPLICA_SET("replica_set")// + ; + + private final String name; + + TagEnum(String name) { + this.name = name; + } + + public static TagEnum getByName(String name) { + if (StringUtil.isEmpty(name)) { + return null; + } + for (TagEnum property : TagEnum.values()) { + if (property.name.equals(name.toUpperCase(Locale.ROOT))) { + return property; + } + } + + return null; + } + + @Override + public String toString() { + return name; + } + } +} diff --git a/core-common/src/main/java/org/apache/kylin/common/tracer/TracerManager.java b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerManager.java new file mode 100644 index 0000000..57379f6 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.tracer; + +import java.util.concurrent.ConcurrentMap; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.shaded.com.google.common.collect.Maps; + +import io.jaegertracing.Configuration; +import io.jaegertracing.internal.samplers.ConstSampler; + +public class TracerManager { + + private static final ConcurrentMap<String, TracerWrapper> CACHE = Maps.newConcurrentMap(); + + public static TracerWrapper getTracerWrapper(String serviceName) { + String endpoint = KylinConfig.getInstanceFromEnv().getTracerCollectorEndpoint(); + + TracerWrapper tracerWrapper = CACHE.get(serviceName); + if (tracerWrapper != null) { + return tracerWrapper; + } + + synchronized (TracerManager.class) { + tracerWrapper = CACHE.get(serviceName); + if (tracerWrapper != null) { + return tracerWrapper; + } + Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv() + .withType(ConstSampler.TYPE).withParam(1); + Configuration.SenderConfiguration sender = Configuration.SenderConfiguration.fromEnv() + .withEndpoint(endpoint); + Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv() + .withLogSpans(false).withSender(sender); + Configuration config = new Configuration(serviceName).withSampler(samplerConfig) + .withReporter(reporterConfig); + tracerWrapper = new JaegerTracerWrapper(config.getTracer()); + CACHE.put(serviceName, tracerWrapper); + return tracerWrapper; + } + } +} diff --git a/core-common/src/main/java/org/apache/kylin/common/tracer/TracerWrapper.java b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerWrapper.java new file mode 100644 index 0000000..dbcde5c --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/tracer/TracerWrapper.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.tracer; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; + +public abstract class TracerWrapper { + + private final Tracer tracer; + + TracerWrapper(Tracer tracer) { + this.tracer = tracer; + } + + public Span activeSpan() { + return tracer.activeSpan(); + } + + public Span startSpan(TracerConstants.OperationEum operation, Span parentSpan) { + return startSpan(operation.toString(), parentSpan); + } + + public Span startSpan(String operationName, Span parentSpan) { + Scope scope; + if (parentSpan != null) { + scope = tracer.buildSpan(operationName).ignoreActiveSpan().asChildOf(parentSpan).startActive(false); + } else { + scope = tracer.buildSpan(operationName).ignoreActiveSpan().startActive(false); + } + return scope.span(); + } + + public abstract Object getTagValue(Span span, String tagName); + + // by Millisecond + public abstract long getStart(Span span); + + // by Millisecond + public abstract long getDuration(Span span); +} diff --git a/core-common/src/test/java/org/apache/kylin/common/tracer/TracerManagerTest.java b/core-common/src/test/java/org/apache/kylin/common/tracer/TracerManagerTest.java new file mode 100644 index 0000000..61ba30c --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/tracer/TracerManagerTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.tracer; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TracerManagerTest extends LocalFileMetadataTestCase { + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testGetJaegerTracerWrapper() { + TracerWrapper tracerWrapper = TracerManager.getTracerWrapper("Query"); + Assert.assertTrue(tracerWrapper instanceof JaegerTracerWrapper); + + Assert.assertSame(tracerWrapper, TracerManager.getTracerWrapper("Query")); + } +} diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 804ce3f..65d5914 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -62,15 +62,16 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.metadata.tuple.TupleInfo; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.kylin.storage.IStorageQuery; import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.translate.DerivedFilterTranslator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.collect.Lists; -import org.apache.kylin.shaded.com.google.common.collect.Maps; -import org.apache.kylin.shaded.com.google.common.collect.Sets; +import io.opentracing.Span; public abstract class GTCubeStorageQueryBase implements IStorageQuery { @@ -86,6 +87,15 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { + Span queryPlanSpan = QueryContextFacade.current().startQueryPlan(); + try { + return searchInner(context, sqlDigest, returnTupleInfo); + } finally { + queryPlanSpan.finish(); + } + } + + public ITupleIterator searchInner(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo); List<CubeSegmentScanner> scanners = Lists.newArrayList(); @@ -101,7 +111,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { if (scanners.isEmpty()) return ITupleIterator.EMPTY_TUPLE_ITERATOR; - return new SequentialCubeTupleIterator(scanners, request.getCuboid(), request.getDimensions(), request.getDynGroups(), request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest); diff --git a/pom.xml b/pom.xml index b44ea75..916ad18 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,7 @@ <t-digest.version>3.1</t-digest.version> <freemarker.version>2.3.23</freemarker.version> <rocksdb.version>5.9.2</rocksdb.version> + <jaeger.version>0.30.2</jaeger.version> <lz4.version>1.3.0</lz4.version> <mssql-jdbc.version>6.2.2.jre8</mssql-jdbc.version> <!--metric--> @@ -829,6 +830,11 @@ <artifactId>rocksdbjni</artifactId> <version>${rocksdb.version}</version> </dependency> + <dependency> + <groupId>io.jaegertracing</groupId> + <artifactId>jaeger-client</artifactId> + <version>${jaeger.version}</version> + </dependency> <!-- Logging --> <dependency> diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java index e3b2272..846da3b 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java @@ -21,6 +21,7 @@ package org.apache.kylin.query.relnode; import java.util.List; import java.util.stream.Collectors; +import io.opentracing.Span; import org.apache.calcite.adapter.enumerable.EnumerableRel; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; import org.apache.calcite.plan.ConventionTraitDef; @@ -118,7 +119,13 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab QueryContextFacade.current().setCalcitePlan(this.copy(getTraitSet(), getInputs())); } - return impl.visitChild(this, 0, inputAsEnum, pref); + Result result = impl.visitChild(this, 0, inputAsEnum, pref); + //Finish sql parse span + Span sqlParseSpan = QueryContextFacade.current().activeSpan(); + if (sqlParseSpan != null) { + sqlParseSpan.finish(); + } + return result; } protected List<OLAPContext> listContextsHavingScan() { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 8a00066..e731852 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -136,6 +136,8 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.annotation.JsonProperty; +import io.opentracing.Span; + /** * @author xduo */ @@ -464,7 +466,7 @@ public class QueryService extends BasicService { } if (sqlResponse == null && isQueryCacheEnabled) { - sqlResponse = searchQueryInCache(sqlRequest); + sqlResponse = searchQueryInCacheWithSpan(sqlRequest); } // real execution if required @@ -608,6 +610,15 @@ public class QueryService extends BasicService { return username; } + public SQLResponse searchQueryInCacheWithSpan(SQLRequest sqlRequest) { + Span cacheSpan = QueryContextFacade.current().startFetchCache(); + try { + return searchQueryInCache(sqlRequest); + } finally { + cacheSpan.finish(); + } + } + public SQLResponse searchQueryInCache(SQLRequest sqlRequest) { Cache cache = cacheManager.getCache(QUERY_CACHE); Cache.ValueWrapper wrapper = cache.get(sqlRequest.getCacheKey()); @@ -701,7 +712,7 @@ public class QueryService extends BasicService { columnMetas); } if (!isPrepareRequest) { - return executeRequest(correctedSql, sqlRequest, conn); + return executeRequestWithSpan(correctedSql, sqlRequest, conn); } else { long prjLastModifyTime = getProjectManager().getProject(sqlRequest.getProject()).getLastModified(); preparedContextKey = new PreparedContextKey(sqlRequest.getProject(), prjLastModifyTime, correctedSql); @@ -721,7 +732,7 @@ public class QueryService extends BasicService { } else { preparedContext = createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql()); } - return executePrepareRequest(correctedSql, prepareSqlRequest, preparedContext); + return executePrepareRequestWithSpan(correctedSql, prepareSqlRequest, preparedContext); } } finally { @@ -990,6 +1001,12 @@ public class QueryService extends BasicService { } } + private SQLResponse executeRequestWithSpan(String correctedSql, SQLRequest sqlRequest, Connection conn) + throws Exception { + QueryContextFacade.current().startSqlParse(); + return executeRequest(correctedSql, sqlRequest, conn); + } + /** * @param correctedSql * @param sqlRequest @@ -1022,6 +1039,12 @@ public class QueryService extends BasicService { return buildSqlResponse(sqlRequest.getProject(), isPushDown, r.getFirst(), r.getSecond()); } + private SQLResponse executePrepareRequestWithSpan(String correctedSql, PrepareSqlRequest sqlRequest, + PreparedContext preparedContext) throws Exception { + QueryContextFacade.current().startSqlParse(); + return executePrepareRequest(correctedSql, sqlRequest, preparedContext); + } + private SQLResponse executePrepareRequest(String correctedSql, PrepareSqlRequest sqlRequest, PreparedContext preparedContext) throws Exception { ResultSet resultSet = null; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 8cc4daa..04fd26c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -43,6 +43,7 @@ import org.apache.kylin.common.SubThreadPoolExecutor; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; +import org.apache.kylin.common.tracer.TracerConstants.TagEnum; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSerializer; @@ -62,6 +63,10 @@ import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder; +import org.apache.kylin.shaded.com.google.common.cache.CacheLoader; +import org.apache.kylin.shaded.com.google.common.cache.LoadingCache; +import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; @@ -75,13 +80,11 @@ import org.apache.kylin.storage.hbase.util.HBaseUnionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder; -import org.apache.kylin.shaded.com.google.common.cache.CacheLoader; -import org.apache.kylin.shaded.com.google.common.cache.LoadingCache; -import org.apache.kylin.shaded.com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.HBaseZeroCopyByteString; +import io.opentracing.Span; + public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class); @@ -204,9 +207,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { logger.info( "The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size()); + StringBuilder fuzzyKeySizeInfo = new StringBuilder(); for (RawScan rs : rawScans) { + fuzzyKeySizeInfo.append("RawScanFuzzyKey:").append(rs.fuzzyKeys.size()).append(";"); logScan(rs, cubeSeg.getStorageLocationIdentifier()); } + final String fuzzyKeySizeStr = fuzzyKeySizeInfo.toString(); logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size()); @@ -221,6 +227,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { cubeSeg.getConfig().getQuerySegmentCacheMaxSize() * 1024 * 1024); String calculatedSegmentQueryCacheKey = null; if (querySegmentCacheEnabled) { + Span segmentCacheSpan = queryContext.startFetchSegmentCache(cubeSeg.getCubeInstance().getName(), + cubeSeg.getName()); try { logger.info("Query-{}: try to get segment result from cache for segment:{}", queryContext.getQueryId(), cubeSeg); @@ -251,6 +259,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } } catch (Exception e) { logger.error("Fail to handle cached segment result from cache", e); + } finally { + segmentCacheSpan.finish(); } } final String segmentQueryCacheKey = calculatedSegmentQueryCacheKey; @@ -279,8 +289,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public void run() { runEPRange(queryContext, logHeader, compressionResult, builder.build(), epRange.getFirst(), - epRange.getSecond(), epResultItr, querySegmentCacheEnabled, segmentQueryResultBuilder, - segmentQueryCacheKey); + epRange.getSecond(), epResultItr, fuzzyKeySizeStr, querySegmentCacheEnabled, + segmentQueryResultBuilder, segmentQueryCacheKey); } }); } @@ -290,8 +300,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult, final CubeVisitProtos.CubeVisitRequest request, byte[] startKey, byte[] endKey, - final ExpectedSizeIterator epResultItr, final boolean querySegmentCacheEnabled, + final ExpectedSizeIterator epResultItr, String fuzzyKeySizeStr, final boolean querySegmentCacheEnabled, final SegmentQueryResult.Builder segmentQueryResultBuilder, final String segmentQueryCacheKey) { + String range = BytesUtil.toHex(startKey) + "-" + BytesUtil.toHex(endKey); + final Span epRangeSpan = queryContext.startEPRangeQuerySpan(range, cubeSeg.getCubeInstance().getName(), + cubeSeg.getName(), cubeSeg.getStorageLocationIdentifier(), cuboid.getInputID(), cuboid.getId(), + fuzzyKeySizeStr); final String queryId = queryContext.getQueryId(); @@ -333,9 +347,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>(); + + final Span regionRPCSpan = queryContext.startRegionRPCSpan(regionServerName, epRangeSpan); + CubeVisitResponse response = null; try { rowsService.visitCube(controller, request, rpcCallback); - CubeVisitResponse response = rpcCallback.get(); + response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } @@ -343,6 +360,27 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } catch (Exception e) { throw e; } finally { + if (response != null) { + Stats stats = response.getStats(); + regionRPCSpan.setTag(TagEnum.RPC_DURATION.toString(), + String.valueOf(stats.getServiceEndTime() - stats.getServiceStartTime())); + regionRPCSpan.setTag(TagEnum.RPC_SCAN_COUNT.toString(), + String.valueOf(stats.getScannedRowCount())); + regionRPCSpan.setTag(TagEnum.RPC_FILTER_COUNT.toString(), + String.valueOf(stats.getFilteredRowCount())); + regionRPCSpan.setTag(TagEnum.RPC_AGG_COUNT.toString(), + String.valueOf(stats.getAggregatedRowCount())); + regionRPCSpan.setTag(TagEnum.RPC_SERIALIZED_BYTES.toString(), + String.valueOf(stats.getSerializedSize())); + regionRPCSpan.setTag(TagEnum.RPC_SYSTEMLOAD.toString(), + String.valueOf(stats.getSystemCpuLoad())); + regionRPCSpan.setTag(TagEnum.RPC_FREE_MEM.toString(), + String.valueOf(stats.getFreePhysicalMemorySize())); + regionRPCSpan.setTag(TagEnum.RPC_FREE_SWAP.toString(), + String.valueOf(stats.getFreeSwapSpaceSize())); + regionRPCSpan.setTag(TagEnum.RPC_ETC_MSG.toString(), stats.getEtcMsg()); + } + regionRPCSpan.finish(); // Reset the interrupted state Thread.interrupted(); } @@ -461,6 +499,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } catch (Throwable ex) { queryContext.stop(ex); + } finally { + epRangeSpan.finish(); } if (queryContext.isStopped()) { diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java index d1c0bd5..6d529be 100644 --- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java +++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java @@ -18,6 +18,8 @@ package org.apache.kylin.storage.stream.rpc; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + import java.io.IOException; import java.util.Collections; import java.util.Iterator; @@ -47,11 +49,15 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITuple; import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.metadata.tuple.TupleInfo; +import org.apache.kylin.shaded.com.google.common.base.Stopwatch; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.kylin.stream.coordinator.assign.AssignmentsCache; -import org.apache.kylin.stream.core.model.ReplicaSet; import org.apache.kylin.stream.core.model.DataRequest; import org.apache.kylin.stream.core.model.DataResponse; import org.apache.kylin.stream.core.model.Node; +import org.apache.kylin.stream.core.model.ReplicaSet; import org.apache.kylin.stream.core.query.ResponseResultSchema; import org.apache.kylin.stream.core.query.StreamingTupleConverter; import org.apache.kylin.stream.core.query.StreamingTupleIterator; @@ -62,12 +68,7 @@ import org.apache.kylin.stream.core.util.RestService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.base.Stopwatch; -import org.apache.kylin.shaded.com.google.common.collect.Lists; -import org.apache.kylin.shaded.com.google.common.collect.Maps; -import org.apache.kylin.shaded.com.google.common.collect.Sets; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import io.opentracing.Span; /** * TODO use long connection rather than short connection @@ -114,12 +115,16 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { executorService.submit(new Runnable() { @Override public void run() { + final Span streamingQuerySpan = query.startStreamingReceiverQuerySpan(cubeDesc.getName(), + rs.getReplicaSetID()); try { Iterator<ITuple> tuplesBlock = search(dataRequest, cube, tupleConverter, recordsSerializer, rs, tupleInfo); result.addBlock(tuplesBlock); } catch (Exception e) { result.setEndpointException(e); + } finally { + streamingQuerySpan.finish(); } } });