This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch QueryMetrics0.13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1f87529758b2a02f88a89eeeb5cbe16774a7315d Author: Minghui Liu <[email protected]> AuthorDate: Mon Nov 7 13:48:35 2022 +0800 add QueryStatistics --- server/src/assembly/resources/conf/logback.xml | 20 +++ .../threadpool/ScheduledExecutorUtil.java | 196 +++++++++++++++++++++ .../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 + .../iotdb/db/query/control/QueryStatistics.java | 130 ++++++++++++++ 4 files changed, 347 insertions(+) diff --git a/server/src/assembly/resources/conf/logback.xml b/server/src/assembly/resources/conf/logback.xml index 7349c2b0b2..310053054d 100644 --- a/server/src/assembly/resources/conf/logback.xml +++ b/server/src/assembly/resources/conf/logback.xml @@ -265,6 +265,23 @@ <level>INFO</level> </filter> </appender> + <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="QUERY_STATISTICS"> + <file>${IOTDB_LOG_DIR}/log_query_statistics.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <fileNamePattern>${IOTDB_LOG_DIR}/log-query-statistics-%d{yyyyMMdd}.%i.log.gz</fileNamePattern> + <maxFileSize>10MB</maxFileSize> + <maxHistory>168</maxHistory> + <totalSizeCap>512MB</totalSizeCap> + </rollingPolicy> + <append>true</append> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern> + <charset>utf-8</charset> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> <root level="info"> <appender-ref ref="FILETRACE"/> <appender-ref ref="FILEDEBUG"/> @@ -299,4 +316,7 @@ <logger level="info" name="COMPACTION"> <appender-ref ref="COMPACTION"/> </logger> + <logger level="info" name="QUERY_STATISTICS"> + <appender-ref ref="QUERY_STATISTICS"/> + </logger> </configuration> diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java new file mode 100644 index 0000000000..950947331d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.concurrent.threadpool; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class ScheduledExecutorUtil { + + private static final Logger logger = LoggerFactory.getLogger(ScheduledExecutorUtil.class); + + /** + * A safe wrapper method to make sure the exception thrown by the previous running will not affect + * the next one. Please reference the javadoc of {@link + * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} for more details. + * + * @param executor the ScheduledExecutorService instance. + * @param command same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + * @param initialDelay same parameter in {@link + * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}. + * @param period same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + * @param unit same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + * @return the same return value of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + */ + @SuppressWarnings("unsafeThreadSchedule") + public static ScheduledFuture<?> safelyScheduleAtFixedRate( + ScheduledExecutorService executor, + Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + return scheduleAtFixedRate(executor, command, initialDelay, period, unit, false); + } + + /** + * A safe wrapper method to make sure the exception thrown by the previous running will not affect + * the next one. Please reference the javadoc of {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} for more + * details. + * + * @param executor the ScheduledExecutorService instance. + * @param command same parameter in {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}. + * @param initialDelay same parameter in {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}. + * @param delay same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, + * long, long, TimeUnit)}. + * @param unit same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, + * long, long, TimeUnit)}. + * @return the same return value of {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}. + */ + @SuppressWarnings("unsafeThreadSchedule") + public static ScheduledFuture<?> safelyScheduleWithFixedDelay( + ScheduledExecutorService executor, + Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + return scheduleWithFixedDelay(executor, command, initialDelay, delay, unit, false); + } + + /** + * A wrapper method to have the same semantic with {@link + * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, except for + * logging an error log when any uncaught exception happens. + * + * @param executor the ScheduledExecutorService instance. + * @param command same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + * @param initialDelay same parameter in {@link + * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}. + * @param period same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + * @param unit same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + * @return the same return value of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, + * long, long, TimeUnit)}. + */ + @SuppressWarnings("unsafeThreadSchedule") + public static ScheduledFuture<?> unsafelyScheduleAtFixedRate( + ScheduledExecutorService executor, + Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + return scheduleAtFixedRate(executor, command, initialDelay, period, unit, true); + } + + /** + * A wrapper method to have the same semantic with {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, except for + * logging an error log when any uncaught exception happens. + * + * @param executor the ScheduledExecutorService instance. + * @param command same parameter in {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}. + * @param initialDelay same parameter in {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}. + * @param delay same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, + * long, long, TimeUnit)}. + * @param unit same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, + * long, long, TimeUnit)}. + * @return the same return value of {@link + * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}. + */ + @SuppressWarnings("unsafeThreadSchedule") + public static ScheduledFuture<?> unsafelyScheduleWithFixedDelay( + ScheduledExecutorService executor, + Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + return scheduleWithFixedDelay(executor, command, initialDelay, delay, unit, true); + } + + @SuppressWarnings("unsafeThreadSchedule") + private static ScheduledFuture<?> scheduleAtFixedRate( + ScheduledExecutorService executor, + Runnable command, + long initialDelay, + long period, + TimeUnit unit, + boolean unsafe) { + return executor.scheduleAtFixedRate( + () -> { + try { + command.run(); + } catch (Throwable t) { + logger.error("Schedule task failed", t); + if (unsafe) { + throw t; + } + } + }, + initialDelay, + period, + unit); + } + + @SuppressWarnings("unsafeThreadSchedule") + private static ScheduledFuture<?> scheduleWithFixedDelay( + ScheduledExecutorService executor, + Runnable command, + long initialDelay, + long delay, + TimeUnit unit, + boolean unsafe) { + return executor.scheduleWithFixedDelay( + () -> { + try { + command.run(); + } catch (Throwable t) { + logger.error("Schedule task failed", t); + if (unsafe) { + throw t; + } + } + }, + initialDelay, + delay, + unit); + } + + public static RuntimeException propagate(Throwable throwable) { + logger.error("Run thread failed", throwable); + Throwables.throwIfUnchecked(throwable); + throw new RuntimeException(throwable); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index 6d09cb4ac6..49059934f3 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -40,6 +40,7 @@ public class IoTDBConstant { public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL"; public static final String COMPACTION_LOGGER_NAME = "COMPACTION"; public static final String DOUBLE_LIVE_LOGGER_NAME = "DOUBLE_LIVE"; + public static final String QUERY_STATISTICS_LOGGER_NAME = "QUERY_STATISTICS"; public static final String IOTDB_JMX_PORT = "iotdb.jmx.port"; diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java new file mode 100644 index 0000000000..3ce09646f0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.control; + +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.db.conf.IoTDBConstant; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class QueryStatistics { + + private static final long QUERY_STATISTICS_PRINT_INTERVAL_IN_MS = 10_000; + + private static final Logger QUERY_STATISTICS_LOGGER = + LoggerFactory.getLogger(IoTDBConstant.QUERY_STATISTICS_LOGGER_NAME); + + private final AtomicBoolean tracing = new AtomicBoolean(false); + + private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>(); + + public static final String PARSER = "Parser"; + public static final String PLANNER = "Planner"; + + private QueryStatistics() { + ScheduledExecutorService scheduledExecutor = + IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print"); + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + scheduledExecutor, + this::printQueryStatistics, + 0, + QUERY_STATISTICS_PRINT_INTERVAL_IN_MS, + TimeUnit.MILLISECONDS); + } + + private void printQueryStatistics() { + if (tracing.get()) { + operationStatistics.forEach( + (k, v) -> { + QUERY_STATISTICS_LOGGER.info("Operation: {}, Statistics: {}", k, v); + }); + } + } + + public static QueryStatistics getInstance() { + return QueryStatisticsHolder.INSTANCE; + } + + public void addCost(String key, long costTimeInNanos) { + if (tracing.get()) { + operationStatistics + .computeIfAbsent(key, k -> new OperationStatistic()) + .addTimeCost(costTimeInNanos); + } + } + + public void disableTracing() { + tracing.set(false); + operationStatistics.clear(); + } + + public void enableTracing() { + tracing.set(true); + operationStatistics.clear(); + } + + private static class OperationStatistic { + // accumulated operation time in ns + private final AtomicLong totalTime; + private final AtomicLong totalCount; + + public OperationStatistic() { + this.totalTime = new AtomicLong(0); + this.totalCount = new AtomicLong(0); + } + + public void addTimeCost(long costTimeInNanos) { + totalTime.addAndGet(costTimeInNanos); + totalCount.incrementAndGet(); + } + + @Override + public String toString() { + long time = totalTime.get() / 1_000; + long count = totalCount.get(); + return "{" + + "totalTime=" + + time + + "us" + + ", totalCount=" + + count + + ", avgOperationTime=" + + (time / count) + + "us" + + '}'; + } + } + + private static class QueryStatisticsHolder { + + private static final QueryStatistics INSTANCE = new QueryStatistics(); + + private QueryStatisticsHolder() {} + } +}
