This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new cd339658bd KYLIN-5381 Avoid timeout when cleaning query history by limiting the number of data deleted each time (#29355) cd339658bd is described below commit cd339658bd37822b720ced6526c3356d3938b068 Author: Wang Hui <wanda1...@users.noreply.github.com> AuthorDate: Mon Oct 31 12:03:03 2022 +0800 KYLIN-5381 Avoid timeout when cleaning query history by limiting the number of data deleted each time (#29355) Co-authored-by: hui.wang <hui.w...@kyligence.io> --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../metadata/query/JdbcQueryHistoryStore.java | 101 +++++++++++++++------ .../kylin/metadata/query/QueryHistoryDAO.java | 8 +- .../kylin/metadata/query/QueryHistoryMapper.java | 6 ++ .../metadata/query/QueryHistoryProjectInfo.java | 20 ++++ .../kylin/metadata/query/RDBMSQueryHistoryDAO.java | 78 +++++++++++----- .../metadata/query/util/QueryHisStoreUtil.java | 39 +++++--- .../metadata/query/RDBMSQueryHistoryDaoTest.java | 72 +++++++++++++-- 8 files changed, 253 insertions(+), 75 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9c4601705f..c917c96e7c 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2801,6 +2801,10 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.queryhistory.project-max-size", "1000000")); } + public int getQueryHistorySingleDeletionSize() { + return Integer.parseInt(getOptional("kylin.query.queryhistory.single-deletion-size", "2000")); + } + public long getQueryHistorySurvivalThreshold() { return TimeUtil.timeStringAs(getOptional("kylin.query.queryhistory.survival-time-threshold", "30d"), TimeUnit.MILLISECONDS); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java index a4b3194f34..250c704991 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java @@ -25,6 +25,7 @@ import static org.mybatis.dynamic.sql.SqlBuilder.isGreaterThan; import static org.mybatis.dynamic.sql.SqlBuilder.isGreaterThanOrEqualTo; import static org.mybatis.dynamic.sql.SqlBuilder.isIn; import static org.mybatis.dynamic.sql.SqlBuilder.isLessThan; +import static org.mybatis.dynamic.sql.SqlBuilder.isLessThanOrEqualTo; import static org.mybatis.dynamic.sql.SqlBuilder.isLike; import static org.mybatis.dynamic.sql.SqlBuilder.isLikeCaseInsensitive; import static org.mybatis.dynamic.sql.SqlBuilder.isNotEqualTo; @@ -40,8 +41,10 @@ import java.io.PrintWriter; import java.nio.charset.Charset; import java.sql.Connection; import java.sql.SQLException; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -67,6 +70,7 @@ import org.mybatis.dynamic.sql.render.RenderingStrategies; import org.mybatis.dynamic.sql.select.QueryExpressionDSL; import org.mybatis.dynamic.sql.select.SelectModel; import org.mybatis.dynamic.sql.select.join.EqualTo; +import org.mybatis.dynamic.sql.select.aggregate.Count; import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider; @@ -198,24 +202,28 @@ public class JdbcQueryHistoryStore { try (SqlSession session = sqlSessionFactory.openSession()) { QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); SelectStatementProvider statementProvider = selectDistinct(queryHistoryRealizationTable.queryId) - .from(queryHistoryRealizationTable).where(queryHistoryRealizationTable.model, isIn(modelIds)) + .from(queryHistoryRealizationTable) // + .where(queryHistoryRealizationTable.model, isIn(modelIds)) // .build().render(RenderingStrategies.MYBATIS3); - return mapper.selectMany(statementProvider).stream().map(QueryHistory::getQueryId).collect(Collectors.toList()); + return mapper.selectMany(statementProvider).stream().map(QueryHistory::getQueryId) + .collect(Collectors.toList()); } } public List<QueryStatistics> queryQueryHistoriesModelIds(QueryHistoryRequest request, int size) { try (SqlSession session = sqlSessionFactory.openSession()) { QueryStatisticsMapper mapper = session.getMapper(QueryStatisticsMapper.class); - SelectStatementProvider statementProvider1 = selectDistinct(queryHistoryTable.engineType).from(queryHistoryTable) - .where(queryHistoryTable.engineType, isNotEqualTo("NATIVE")) - .and(queryHistoryTable.projectName, isEqualTo(request.getProject())) + SelectStatementProvider statementProvider1 = selectDistinct(queryHistoryTable.engineType) + .from(queryHistoryTable) // + .where(queryHistoryTable.engineType, isNotEqualTo("NATIVE")) // + .and(queryHistoryTable.projectName, isEqualTo(request.getProject())) // .build().render(RenderingStrategies.MYBATIS3); List<QueryStatistics> engineTypes = mapper.selectMany(statementProvider1); - SelectStatementProvider statementProvider2 = selectDistinct(queryHistoryRealizationTable.model).from(queryHistoryRealizationTable) - .where(queryHistoryRealizationTable.projectName, isEqualTo(request.getProject())) - .limit(size) + SelectStatementProvider statementProvider2 = selectDistinct(queryHistoryRealizationTable.model) + .from(queryHistoryRealizationTable) // + .where(queryHistoryRealizationTable.projectName, isEqualTo(request.getProject())) // + .limit(size) // .build().render(RenderingStrategies.MYBATIS3); List<QueryStatistics> modelIds = mapper.selectMany(statementProvider2); engineTypes.addAll(modelIds); @@ -223,33 +231,70 @@ public class JdbcQueryHistoryStore { } } - public QueryHistory queryOldestQueryHistory(long maxSize) { + public QueryHistory getOldestQueryHistory(long index) { try (SqlSession session = sqlSessionFactory.openSession()) { QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); SelectStatementProvider statementProvider = select(getSelectFields(queryHistoryTable)) .from(queryHistoryTable) // - .orderBy(queryHistoryTable.id.descending()) // + .orderBy(queryHistoryTable.id) // .limit(1) // - .offset(maxSize - 1) // + .offset(index - 1L) // .build().render(RenderingStrategies.MYBATIS3); return mapper.selectOne(statementProvider); } } - public QueryHistory queryOldestQueryHistory(long maxSize, String project) { + public QueryHistory getOldestQueryHistory(String project, long index) { try (SqlSession session = sqlSessionFactory.openSession()) { QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); - SelectStatementProvider statementProvider = select(getSelectFields(queryHistoryTable)) // + SelectStatementProvider statementProvider = select(getSelectFields(queryHistoryTable)) .from(queryHistoryTable) // .where(queryHistoryTable.projectName, isEqualTo(project)) // - .orderBy(queryHistoryTable.id.descending()) // + .orderBy(queryHistoryTable.id) // .limit(1) // - .offset(maxSize - 1) // + .offset(index - 1L) // .build().render(RenderingStrategies.MYBATIS3); return mapper.selectOne(statementProvider); } } + public Long getCountOnQueryHistory() { + try (SqlSession session = sqlSessionFactory.openSession()) { + QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); + SelectStatementProvider statementProvider = select(Count.of(queryHistoryTable.id)) // + .from(queryHistoryTable) // + .build().render(RenderingStrategies.MYBATIS3); + return mapper.selectAsLong(statementProvider); + } + } + + public Long getCountOnQueryHistory(long retainTime) { + try (SqlSession session = sqlSessionFactory.openSession()) { + QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); + SelectStatementProvider statementProvider = select(Count.of(queryHistoryTable.id).as(COUNT)) // + .from(queryHistoryTable) // + .where(queryHistoryTable.queryTime, isLessThan(retainTime)) // + .build().render(RenderingStrategies.MYBATIS3); + return mapper.selectAsLong(statementProvider); + } + } + + public Map<String, Long> getCountGroupByProject() { + Map<String, Long> projectCounts = new HashMap<>(); + List<QueryHistoryProjectInfo> projectInfos; + try (SqlSession session = sqlSessionFactory.openSession()) { + QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); + SelectStatementProvider statementProvider = select(queryHistoryTable.projectName, + count(queryHistoryTable.id).as(COUNT)) // + .from(queryHistoryTable) // + .groupBy(queryHistoryTable.projectName) // + .build().render(RenderingStrategies.MYBATIS3); + projectInfos = mapper.selectByProject(statementProvider); + } + projectInfos.forEach(projectInfo -> projectCounts.put(projectInfo.getProjectName(), projectInfo.getCount())); + return projectCounts; + } + public QueryHistory queryByQueryId(String queryId) { try (SqlSession session = sqlSessionFactory.openSession()) { QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); @@ -398,27 +443,28 @@ public class JdbcQueryHistoryStore { } } - public void deleteQueryHistory(long queryTime) { + public int deleteQueryHistory(long id) { long startTime = System.currentTimeMillis(); try (SqlSession session = sqlSessionFactory.openSession()) { QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); DeleteStatementProvider deleteStatement = SqlBuilder.deleteFrom(queryHistoryTable) // - .where(queryHistoryTable.queryTime, isLessThan(queryTime)) // + .where(queryHistoryTable.id, isLessThanOrEqualTo(id)) // .build().render(RenderingStrategies.MYBATIS3); int deleteRows = mapper.delete(deleteStatement); session.commit(); if (deleteRows > 0) { log.info("Delete {} row query history takes {} ms", deleteRows, System.currentTimeMillis() - startTime); } + return deleteRows; } } - public void deleteQueryHistory(long queryTime, String project) { + public int deleteQueryHistory(String project, long id) { long startTime = System.currentTimeMillis(); try (SqlSession session = sqlSessionFactory.openSession()) { QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); DeleteStatementProvider deleteStatement = SqlBuilder.deleteFrom(queryHistoryTable) // - .where(queryHistoryTable.queryTime, isLessThan(queryTime)) // + .where(queryHistoryTable.id, isLessThanOrEqualTo(id)) // .and(queryHistoryTable.projectName, isEqualTo(project)) // .build().render(RenderingStrategies.MYBATIS3); int deleteRows = mapper.delete(deleteStatement); @@ -427,6 +473,7 @@ public class JdbcQueryHistoryStore { log.info("Delete {} row query history for project [{}] takes {} ms", deleteRows, project, System.currentTimeMillis() - startTime); } + return deleteRows; } } @@ -463,7 +510,7 @@ public class JdbcQueryHistoryStore { } } - public void deleteQueryHistoryRealization(long queryTime, String project) { + public void deleteQueryHistoryRealization(String project, long queryTime) { long startTime = System.currentTimeMillis(); try (SqlSession session = sqlSessionFactory.openSession()) { QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); @@ -631,7 +678,8 @@ public class JdbcQueryHistoryStore { if (request.isSubmitterExactlyMatch()) { filterSql = filterSql.and(queryHistoryTable.querySubmitter, isIn(request.getFilterSubmitter())); } else if (request.getFilterSubmitter().size() == 1) { - filterSql = filterSql.and(queryHistoryTable.querySubmitter, isLikeCaseInsensitive("%" + request.getFilterSubmitter().get(0) + "%")); + filterSql = filterSql.and(queryHistoryTable.querySubmitter, + isLikeCaseInsensitive("%" + request.getFilterSubmitter().get(0) + "%")); } } @@ -655,12 +703,14 @@ public class JdbcQueryHistoryStore { } } else if (selectAllModels) { // Process CONSTANTS, HIVE, RDBMS and all model - filterSql = filterSql.and(queryHistoryTable.engineType, isIn(realizations), or(queryHistoryTable.indexHit, isEqualTo(true))); + filterSql = filterSql.and(queryHistoryTable.engineType, isIn(realizations), + or(queryHistoryTable.indexHit, isEqualTo(true))); } else if (request.getFilterModelIds() != null && !request.getFilterModelIds().isEmpty()) { // Process CONSTANTS, HIVE, RDBMS and model1, model2, model3... - filterSql = filterSql.and(queryHistoryTable.engineType, isIn(realizations), or(queryHistoryTable.queryId, - isIn(selectDistinct(queryHistoryRealizationTable.queryId).from(queryHistoryRealizationTable) - .where(queryHistoryRealizationTable.model, isIn(request.getFilterModelIds()))))); + filterSql = filterSql.and(queryHistoryTable.engineType, isIn(realizations), + or(queryHistoryTable.queryId, + isIn(selectDistinct(queryHistoryRealizationTable.queryId).from(queryHistoryRealizationTable) + .where(queryHistoryRealizationTable.model, isIn(request.getFilterModelIds()))))); } else { // Process CONSTANTS, HIVE, RDBMS filterSql = filterSql.and(queryHistoryTable.engineType, isIn(realizations)); @@ -762,5 +812,4 @@ public class JdbcQueryHistoryStore { queryHistoryTable.queryTime, queryHistoryTable.resultRowCount, queryHistoryTable.sql, queryHistoryTable.sqlPattern, queryHistoryTable.totalScanBytes, queryHistoryTable.totalScanCount); } - } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java index cd91f42287..d954368f35 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java @@ -19,6 +19,7 @@ package org.apache.kylin.metadata.query; import java.util.List; +import java.util.Map; public interface QueryHistoryDAO { @@ -44,10 +45,10 @@ public interface QueryHistoryDAO { void deleteQueryHistoriesIfMaxSizeReached(); - void deleteQueryHistoriesIfProjectMaxSizeReached(String project); - void deleteQueryHistoriesIfRetainTimeReached(); + void deleteOldestQueryHistoriesByProject(String project, int deleteCount); + long getQueryHistoriesSize(QueryHistoryRequest request, String project); QueryHistory getByQueryId(String queryId); @@ -59,4 +60,7 @@ public interface QueryHistoryDAO { String getRealizationMetricMeasurement(); List<QueryDailyStatistic> getQueryDailyStatistic(long startTime, long endTime); + + Map<String, Long> getQueryCountByProject(); + } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java index ed39f0a9c6..e5a24f4678 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java @@ -69,6 +69,12 @@ public interface QueryHistoryMapper { @Result(column = "reserved_field_3", property = "queryHistoryInfo", jdbcType = JdbcType.BLOB, typeHandler = QueryHistoryTable.QueryHistoryInfoHandler.class) }) List<QueryHistory> selectMany(SelectStatementProvider selectStatement); + @SelectProvider(type = SqlProviderAdapter.class, method = "select") + @Results(id = "QueryHistoryProjectInfoResult", value = { + @Result(column = "project_name", property = "projectName", jdbcType = JdbcType.VARCHAR), + @Result(column = "count", property = "count", jdbcType = JdbcType.BIGINT) }) + List<QueryHistoryProjectInfo> selectByProject(SelectStatementProvider selectStatement); + @SelectProvider(type = SqlProviderAdapter.class, method = "select") @ResultMap("QueryHistoryResult") QueryHistory selectOne(SelectStatementProvider selectStatement); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java new file mode 100644 index 0000000000..2324979f71 --- /dev/null +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java @@ -0,0 +1,20 @@ +package org.apache.kylin.metadata.query; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Setter +@Slf4j +public class QueryHistoryProjectInfo { + + public static final String PROJECT_NAME = "project_name"; + + @JsonProperty(PROJECT_NAME) + private String projectName; + + private long count; + +} diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java index 73124496de..14fdf0c4a1 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java @@ -23,18 +23,19 @@ import java.time.Duration; import java.time.Instant; import java.util.Date; import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.Set; import java.util.TimeZone; +import java.util.function.IntFunction; import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.Singletons; import org.apache.kylin.common.StorageURL; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.TimeUtil; -import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +46,8 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { private static final Logger logger = LoggerFactory.getLogger(RDBMSQueryHistoryDAO.class); @Setter private String queryMetricMeasurement; - private String realizationMetricMeasurement; - private JdbcQueryHistoryStore jdbcQueryHisStore; + private final String realizationMetricMeasurement; + private final JdbcQueryHistoryStore jdbcQueryHisStore; public static final String WEEK = "week"; public static final String DAY = "day"; @@ -104,34 +105,43 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { jdbcQueryHisStore.deleteQueryHistoryRealization(project); } - public void deleteQueryHistoriesIfMaxSizeReached() { - QueryHistory queryHistory = jdbcQueryHisStore - .queryOldestQueryHistory(KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize()); - if (Objects.nonNull(queryHistory)) { - long time = queryHistory.getQueryTime(); - jdbcQueryHisStore.deleteQueryHistory(time); - jdbcQueryHisStore.deleteQueryHistoryRealization(time); - } - } - public QueryHistory getByQueryId(String queryId) { return jdbcQueryHisStore.queryByQueryId(queryId); } - public void deleteQueryHistoriesIfProjectMaxSizeReached(String project) { - QueryHistory queryHistory = jdbcQueryHisStore - .queryOldestQueryHistory(KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize(), project); - if (Objects.nonNull(queryHistory)) { - long time = queryHistory.getQueryTime(); - jdbcQueryHisStore.deleteQueryHistory(time, project); - jdbcQueryHisStore.deleteQueryHistoryRealization(time, project); + public void deleteQueryHistoriesIfMaxSizeReached() { + long maxSize = KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize(); + long totalCount = jdbcQueryHisStore.getCountOnQueryHistory(); + if (totalCount > maxSize) { + deleteQueryHistoryAndRealization((int) (totalCount - maxSize)); } } public void deleteQueryHistoriesIfRetainTimeReached() { - long retainTime = getRetainTime(); - jdbcQueryHisStore.deleteQueryHistory(retainTime); - jdbcQueryHisStore.deleteQueryHistoryRealization(retainTime); + long rangeOutCount = jdbcQueryHisStore.getCountOnQueryHistory(getRetainTime()); + if (rangeOutCount > 0) { + deleteQueryHistoryAndRealization((int) rangeOutCount); + } + } + + public void deleteQueryHistoryAndRealization(int deleteCount) { + int singleLimit = KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize(); + largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> { + QueryHistory queryHistory = jdbcQueryHisStore.getOldestQueryHistory(currentCount); + int deletedRows = jdbcQueryHisStore.deleteQueryHistory(queryHistory.getId()); + jdbcQueryHisStore.deleteQueryHistoryRealization(queryHistory.getQueryTime()); + return deletedRows; + }, "Cleanup all query history"); + } + + public void deleteOldestQueryHistoriesByProject(String project, int deleteCount) { + int singleLimit = KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize(); + largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> { + QueryHistory queryHistory = jdbcQueryHisStore.getOldestQueryHistory(project, currentCount); + int deletedRows = jdbcQueryHisStore.deleteQueryHistory(project, queryHistory.getId()); + jdbcQueryHisStore.deleteQueryHistoryRealization(project, queryHistory.getQueryTime()); + return deletedRows; + }, "Cleanup project<" + project + "> query history"); } public void batchUpdateQueryHistoriesInfo(List<Pair<Long, QueryHistoryInfo>> idToQHInfoList) { @@ -215,6 +225,11 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { return jdbcQueryHisStore.queryAvgDurationByTime(startTime, endTime, timeDimension, project); } + @Override + public Map<String, Long> getQueryCountByProject() { + return jdbcQueryHisStore.getCountGroupByProject(); + } + public static void fillZeroForQueryStatistics(List<QueryStatistics> queryStatistics, long startTime, long endTime, String dimension) { if (!dimension.equalsIgnoreCase(DAY) && !dimension.equalsIgnoreCase(WEEK)) { @@ -245,4 +260,19 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { } } } + + public static void largeSplitToSmallTask(int totalCount, int singleSize, IntFunction<Integer> function, + String description) { + int retainCount = totalCount; + while (retainCount > 0) { + int currentCount = Math.min(retainCount, singleSize); + int actualCount = function.apply(currentCount); + if (currentCount != actualCount && logger.isWarnEnabled()) { + logger.warn("The task {} was not performed as expected, expect:{}, actual:{}", description, + currentCount, actualCount); + } + retainCount -= currentCount; + } + } + } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java index 1c0b8c779e..f6589ed5d0 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java @@ -27,11 +27,13 @@ import java.nio.charset.Charset; import java.sql.Connection; import java.sql.SQLException; import java.util.Locale; +import java.util.Map; import java.util.Properties; import javax.sql.DataSource; import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.lang3.time.StopWatch; import org.apache.ibatis.jdbc.ScriptRunner; import org.apache.ibatis.mapping.Environment; import org.apache.ibatis.session.Configuration; @@ -45,7 +47,6 @@ import org.apache.kylin.common.Singletons; import org.apache.kylin.common.logging.LogOutputStream; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; import org.apache.kylin.common.util.SetThreadName; -import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.query.QueryHistoryDAO; @@ -165,28 +166,40 @@ public class QueryHisStoreUtil { try (SetThreadName ignored = new SetThreadName("QueryHistoryCleanWorker")) { val config = KylinConfig.getInstanceFromEnv(); val projectManager = NProjectManager.getInstance(config); + getQueryHistoryDao().deleteQueryHistoriesIfMaxSizeReached(); getQueryHistoryDao().deleteQueryHistoriesIfRetainTimeReached(); + + Map<String, Long> projectCounts = getQueryHistoryDao().getQueryCountByProject(); for (ProjectInstance project : projectManager.listAllProjects()) { if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread is interrupted: " + Thread.currentThread().getName()); } - if (!EpochManager.getInstance().checkEpochOwner(project.getName())) - continue; - try { - long startTime = System.currentTimeMillis(); - log.info("Start to delete query histories that are beyond max size for project<{}>", - project.getName()); - getQueryHistoryDao().deleteQueryHistoriesIfProjectMaxSizeReached(project.getName()); - log.info("Query histories cleanup for project<{}> finished, it took {}ms", project.getName(), - System.currentTimeMillis() - startTime); - } catch (Exception e) { - log.error("clean query histories<" + project.getName() + "> failed", e); - } + long projectCount = projectCounts.getOrDefault(project.getName(), 0L); + cleanQueryHistory(project.getName(), projectCount); } } } + public static void cleanQueryHistory(String projectName, long historyCount) { + long projectMaxSize = KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize(); + if (historyCount <= projectMaxSize) { + log.info("Query histories of project<{}> is less than the maximum limit, so skip it.", projectName); + return; + } + try { + StopWatch watch = StopWatch.createStarted(); + log.info("Start to delete query histories that are beyond max size for project<{}>, records:{}", + projectName, historyCount); + getQueryHistoryDao().deleteOldestQueryHistoriesByProject(projectName, + (int) (historyCount - projectMaxSize)); + watch.stop(); + log.info("Query histories cleanup for project<{}> finished, it took {}ms", projectName, watch.getTime()); + } catch (Exception e) { + log.error("Clean query histories for project<{}> failed", projectName, e); + } + } + private static QueryHistoryDAO getQueryHistoryDao() { return RDBMSQueryHistoryDAO.getInstance(); } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java index 18443017dc..e6ea6c176c 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java @@ -19,13 +19,16 @@ package org.apache.kylin.metadata.query; import static org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO.fillZeroForQueryStatistics; +import static org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO.largeSplitToSmallTask; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.junit.TimeZoneTestRunner; +import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -71,7 +74,7 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { } @Test - public void testGetQueryHistoriesfilterByIsIndexHit() throws Exception { + public void testGetQueryHistoriesFilterByIsIndexHit() throws Exception { queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true, PROJECT, true)); queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, false, PROJECT, true)); queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, false, PROJECT, true)); @@ -102,7 +105,7 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { } @Test - public void testGetQueryHistoriesfilterByQueryTime() throws Exception { + public void testGetQueryHistoriesFilterByQueryTime() throws Exception { // 2020-01-29 23:25:12 queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true, PROJECT, true)); // 2020-01-30 23:25:12 @@ -123,7 +126,7 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { } @Test - public void testGetQueryHistoriesfilterByDuration() throws Exception { + public void testGetQueryHistoriesFilterByDuration() throws Exception { queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1000L, true, PROJECT, true)); queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 2000L, false, PROJECT, true)); queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 3000L, false, PROJECT, true)); @@ -145,7 +148,7 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { } @Test - public void testGetQueryHistoriesfilterBySql() throws Exception { + public void testGetQueryHistoriesFilterBySql() throws Exception { QueryMetrics queryMetrics1 = createQueryMetrics(1580311512000L, 1L, true, PROJECT, true); queryMetrics1.setSql("select 2 LIMIT 500\n"); queryHistoryDAO.insert(queryMetrics1); @@ -333,6 +336,38 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { Assert.assertEquals(2, monthQueryStatistics.get(0).getMeanDuration(), 0.1); } + @Test + public void testDeleteQueryHistories() throws Exception { + overwriteSystemProp("kylin.query.queryhistory.max-size", "2"); + overwriteSystemProp("kylin.query.queryhistory.project-max-size", "5"); + + String PROJECT_V1 = PROJECT + "_v1"; + + // 2020-01-29 23:25:12 + queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true, PROJECT, true)); + // 2020-01-30 23:25:12 + queryHistoryDAO.insert(createQueryMetrics(1580397912000L, 2L, false, PROJECT, true)); + // 2030-01-28 23:25:12 + queryHistoryDAO.insert(createQueryMetrics(1895844312000L, 3L, false, PROJECT_V1, true)); + // 2030-01-29 23:25:12 + queryHistoryDAO.insert(createQueryMetrics(1895930712000L, 1L, false, PROJECT, true)); + + // before delete + List<QueryHistory> queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset(0, 100, PROJECT); + Assert.assertEquals(3, queryHistoryList.size()); + + // after delete + QueryHisStoreUtil.cleanQueryHistory(); + + queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset(0, 100, PROJECT_V1); + Assert.assertEquals(1, queryHistoryList.size()); + Assert.assertEquals(1895844312000L, queryHistoryList.get(0).getQueryTime()); + + queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset(0, 100, PROJECT); + Assert.assertEquals(1, queryHistoryList.size()); + Assert.assertEquals(1895930712000L, queryHistoryList.get(0).getQueryTime()); + } + @Test public void testDeleteQueryHistoriesIfRetainTimeReached() throws Exception { // 2020-01-29 23:25:12 @@ -401,12 +436,12 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { Assert.assertEquals(4, queryHistoryList.size()); // after delete - queryHistoryDAO.deleteQueryHistoriesIfProjectMaxSizeReached(PROJECT); + QueryHisStoreUtil.cleanQueryHistory(PROJECT, 4); queryHistoryList = queryHistoryDAO.getAllQueryHistories(); Assert.assertEquals(2, queryHistoryList.size()); // test delete empty - queryHistoryDAO.deleteQueryHistoriesIfProjectMaxSizeReached(PROJECT); + QueryHisStoreUtil.cleanQueryHistory(PROJECT, 2); queryHistoryList = queryHistoryDAO.getAllQueryHistories(); Assert.assertEquals(2, queryHistoryList.size()); } @@ -670,12 +705,12 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { Assert.assertEquals(2, queryHistoryList.size()); - Assert.assertEquals(false, queryHistoryList.get(0).getQueryHistoryInfo().isExactlyMatch()); + Assert.assertFalse(queryHistoryList.get(0).getQueryHistoryInfo().isExactlyMatch()); Assert.assertEquals(5, queryHistoryList.get(0).getQueryHistoryInfo().getScanSegmentNum()); Assert.assertEquals("PENDING", queryHistoryList.get(0).getQueryHistoryInfo().getState().toString()); - Assert.assertEquals(false, queryHistoryList.get(0).getQueryHistoryInfo().isExecutionError()); + Assert.assertFalse(queryHistoryList.get(0).getQueryHistoryInfo().isExecutionError()); - Assert.assertEquals(true, queryHistoryList.get(1).getQueryHistoryInfo().isExactlyMatch()); + Assert.assertTrue(queryHistoryList.get(1).getQueryHistoryInfo().isExactlyMatch()); Assert.assertEquals(3, queryHistoryList.get(1).getQueryHistoryInfo().getScanSegmentNum()); Assert.assertEquals("PENDING", queryHistoryList.get(1).getQueryHistoryInfo().getState().toString()); Assert.assertTrue(queryHistoryList.get(1).getQueryHistoryInfo().isExecutionError()); @@ -721,6 +756,23 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { Assert.assertEquals(2L, queryDailyStatistic.get(0).getLt3sNum()); } + @Test + public void testLargeSplitToSmallTask() { + AtomicInteger executions = new AtomicInteger(0); + AtomicInteger actualSize = new AtomicInteger(0); + largeSplitToSmallTask(105, 10, currentCount -> { + executions.incrementAndGet(); + actualSize.addAndGet(currentCount); + if (currentCount < 10) { + return currentCount - 1; + } else { + return currentCount; + } + }, "Test LargeSplitToSmall Task"); + Assert.assertEquals(105, actualSize.get()); + Assert.assertEquals(11, executions.get()); + } + public static QueryMetrics createQueryMetrics(long queryTime, long duration, boolean indexHit, String project, boolean hitModel) { QueryMetrics queryMetrics = new QueryMetrics("6a9a151f-f992-4d52-a8ec-8ff3fd3de6b1", "192.168.1.6:7070"); @@ -753,7 +805,7 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { realizationMetrics.setModelId("82fa7671-a935-45f5-8779-85703601f49a.json"); realizationMetrics.setSnapshots( - Lists.newArrayList(new String[] { "DEFAULT.TEST_KYLIN_ACCOUNT", "DEFAULT.TEST_COUNTRY" })); + Lists.newArrayList("DEFAULT.TEST_KYLIN_ACCOUNT", "DEFAULT.TEST_COUNTRY")); List<QueryMetrics.RealizationMetrics> realizationMetricsList = Lists.newArrayList(); realizationMetricsList.add(realizationMetrics);