This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ca370d2dad5855941ebb699c1ff3d3eb343414f6 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Sun Aug 13 19:57:05 2023 +0800 KYLIN-5789 Clean sparder history and spark history automatically --- .../src/main/resources/kylinSecurity.xml | 1 + .../kylin/rest/controller/NSystemController.java | 18 +- .../src/main/resources/kylinSecurity.xml | 1 + .../apache/kylin/rest/service/ProjectService.java | 6 +- .../apache/kylin/rest/service/SystemService.java | 9 + .../src/test/resources/kylinSecurity.xml | 1 + .../metadata/query/JdbcQueryHistoryStore.java | 19 ++- .../kylin/metadata/query/QueryHistoryDAO.java | 2 + .../kylin/metadata/query/RDBMSQueryHistoryDAO.java | 5 + .../metadata/query/util/QueryHisStoreUtil.java | 5 + .../metadata/query/RDBMSQueryHistoryDaoTest.java | 17 +- .../src/main/resources/kylinSecurity.xml | 1 + .../apache/kylin/rest/service/ScheduleService.java | 60 +++++-- .../kylin/rest/service/ScheduleServiceTest.java | 25 ++- .../src/main/resources/kylinSecurity.xml | 1 + .../org/apache/kylin/rest/QueryNodeFilter.java | 2 + .../apache/kylin/helper/MetadataToolHelper.java | 7 +- .../org/apache/kylin/helper/RoutineToolHelper.java | 41 ++++- .../java/org/apache/kylin/tool/AuditLogTool.java | 18 +- .../org/apache/kylin/tool/MaintainModeTool.java | 21 +-- .../constant/StringConstant.java} | 16 +- .../tool/garbage/CleanTaskExecutorService.java | 14 +- .../kylin/tool/garbage/ExecutableCleaner.java | 6 +- .../apache/kylin/tool/garbage/GarbageCleaner.java | 49 +----- .../apache/kylin/tool/garbage/IndexCleaner.java | 6 +- .../apache/kylin/tool/garbage/MetadataCleaner.java | 47 ++++- .../apache/kylin/tool/garbage/SnapshotCleaner.java | 6 +- .../kylin/tool/garbage/SourceUsageCleaner.java | 7 +- .../apache/kylin/tool/garbage/StorageCleaner.java | 190 ++++++++++++++++++--- .../apache/kylin/tool/routine/FastRoutineTool.java | 1 + .../org/apache/kylin/tool/routine/RoutineTool.java | 31 +++- .../kylin/tool/security/AdminUserInitCLI.java | 18 +- .../kylin/tool/security/KylinPasswordResetCLI.java | 18 +- .../org/apache/kylin/tool/StorageCleanerTest.java | 126 +++++++++++++- .../garbage/CleanTaskExecutorServiceTests.java | 2 +- .../kylin/tool/garbage/ExecutableCleanerTest.java | 8 +- .../kylin/tool/garbage/SnapshotCleanerTest.java | 2 +- .../kylin/tool/garbage/SourceUsageCleanerTest.java | 8 +- .../kylin/tool/security/AdminUserInitCLITest.java | 8 +- .../tool/security/KylinPasswordResetCLITest.java | 10 +- .../appstatus_application_1677899901295_8490 | 0 ..._1_application_1677899901295_8490_1690953331329 | 0 ..._2_application_1677899901295_8490_1690953331329 | 0 ..._1_application_1677899901295_5936_1690235831577 | 0 .../spark-history/application_1677899901295_0989 | 0 .../spark-history/application_1677899901295_8243 | 0 46 files changed, 619 insertions(+), 214 deletions(-) diff --git a/src/common-booter/src/main/resources/kylinSecurity.xml b/src/common-booter/src/main/resources/kylinSecurity.xml index 37dcfc2214..bf3f7aae8e 100644 --- a/src/common-booter/src/main/resources/kylinSecurity.xml +++ b/src/common-booter/src/main/resources/kylinSecurity.xml @@ -269,6 +269,7 @@ <scr:intercept-url pattern="/api/system/diag/progress" access="permitAll"/> <scr:intercept-url pattern="/api/system/roll_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" access="permitAll"/> + <scr:intercept-url pattern="/api/system/clean_sparder_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" access="permitAll"/> <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/query/history_queries/table_names" access="permitAll"/> diff --git a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java index a8cdcda4c2..b29d4a4eaf 100644 --- a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java +++ b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java @@ -43,6 +43,7 @@ import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.util.AddressUtil; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; import org.apache.kylin.helper.MetadataToolHelper; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.project.ProjectInstance; @@ -79,8 +80,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; - import io.swagger.annotations.ApiOperation; import lombok.val; import lombok.extern.slf4j.Slf4j; @@ -148,7 +147,8 @@ public class NSystemController extends NBasicController { diagPackageRequest.setStart(""); diagPackageRequest.setEnd(""); } else { - if (StringUtils.isBlank(diagPackageRequest.getStart()) || StringUtils.isBlank(diagPackageRequest.getEnd())) { + if (StringUtils.isBlank(diagPackageRequest.getStart()) + || StringUtils.isBlank(diagPackageRequest.getEnd())) { throw new KylinException(TIME_INVALID_RANGE_NOT_CONSISTENT); } } @@ -210,7 +210,7 @@ public class NSystemController extends NBasicController { return systemService.getExtractorStatus(id, project); } else { String url = host + "/kylin/api/system/diag/status?id=" + id; - if(StringUtils.isNotEmpty(project)){ + if (StringUtils.isNotEmpty(project)) { url = url + "&project=" + project; } return generateTaskForRemoteHost(request, url); @@ -229,7 +229,7 @@ public class NSystemController extends NBasicController { response); } else { String url = host + "/kylin/api/system/diag?id=" + id; - if(StringUtils.isNotEmpty(project)){ + if (StringUtils.isNotEmpty(project)) { url = url + "&project=" + project; } downloadFromRemoteHost(request, url, response); @@ -284,11 +284,9 @@ public class NSystemController extends NBasicController { val servers = clusterManager.getServers(); response.setStatus(maintenanceModeService.getMaintenanceMode()); if (ext) { - response.setServers( - servers.stream().map(server -> - new ServerExtInfoResponse() - .setServer(server) - .setSecretName(encodeHost(server.getHost()))).collect(Collectors.toList())); + response.setServers(servers.stream().map( + server -> new ServerExtInfoResponse().setServer(server).setSecretName(encodeHost(server.getHost()))) + .collect(Collectors.toList())); } else { response.setServers(servers.stream().map(ServerInfoResponse::getHost).collect(Collectors.toList())); } diff --git a/src/common-server/src/main/resources/kylinSecurity.xml b/src/common-server/src/main/resources/kylinSecurity.xml index aa6e2154a8..313a14afe0 100644 --- a/src/common-server/src/main/resources/kylinSecurity.xml +++ b/src/common-server/src/main/resources/kylinSecurity.xml @@ -269,6 +269,7 @@ <scr:intercept-url pattern="/api/system/diag/progress" access="permitAll"/> <scr:intercept-url pattern="/api/system/roll_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" access="permitAll"/> + <scr:intercept-url pattern="/api/system/clean_sparder_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" access="permitAll"/> <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/query/history_queries/table_names" access="permitAll"/> diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java index aafde56676..4299f0d7e3 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java @@ -128,7 +128,7 @@ import org.apache.kylin.rest.security.AclPermissionEnum; import org.apache.kylin.rest.security.KerberosLoginManager; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.streaming.manager.StreamingJobManager; -import org.apache.kylin.tool.garbage.GarbageCleaner; +import org.apache.kylin.tool.garbage.MetadataCleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -361,7 +361,7 @@ public class ProjectService extends BasicService { boolean needAggressiveOpt = Arrays.stream(config.getProjectsAggressiveOptimizationIndex()) .map(StringUtils::lowerCase).collect(Collectors.toList()) .contains(StringUtils.toRootLowerCase(project.getName())); - GarbageCleaner.cleanMetadata(project.getName(), needAggressiveOpt); + MetadataCleaner.clean(project.getName(), needAggressiveOpt); EventBusFactory.getInstance().callService(new ProjectCleanOldQueryResultEvent(project.getName())); } catch (Exception e) { logger.warn("clean project<" + project.getName() + "> failed", e); @@ -412,7 +412,7 @@ public class ProjectService extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#project, 'ADMINISTRATION')") public void cleanupGarbage(String project, boolean needAggressiveOpt) throws Exception { projectSmartService.cleanupGarbage(project, 0); - GarbageCleaner.cleanMetadata(project, needAggressiveOpt); + MetadataCleaner.clean(project, needAggressiveOpt); asyncTaskService.cleanupStorage(); } diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java index 24505237e9..e983bcb46c 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java @@ -55,6 +55,7 @@ import org.apache.kylin.common.util.BufferedLogger; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.StringHelper; import org.apache.kylin.helper.MetadataToolHelper; +import org.apache.kylin.helper.RoutineToolHelper; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; @@ -386,4 +387,12 @@ public class SystemService extends BasicService { } return result; } + + public void cleanSparderEventLog() { + val config = KylinConfig.getInstanceFromEnv(); + if (config.isQueryNodeOnly()) { + logger.info("Clean current sparder event log for RPC"); + RoutineToolHelper.cleanEventLog(true, true, false); + } + } } diff --git a/src/common-service/src/test/resources/kylinSecurity.xml b/src/common-service/src/test/resources/kylinSecurity.xml index 1db0da1bc3..c8bdfc9937 100644 --- a/src/common-service/src/test/resources/kylinSecurity.xml +++ b/src/common-service/src/test/resources/kylinSecurity.xml @@ -261,6 +261,7 @@ <scr:intercept-url pattern="/api/system/diag/progress" access="permitAll"/> <scr:intercept-url pattern="/api/system/roll_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" access="permitAll"/> + <scr:intercept-url pattern="/api/system/clean_sparder_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" access="permitAll"/> <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/query/history_queries/table_names" access="permitAll"/> diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java index 6ffbd9eac0..e35aae28f5 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java @@ -31,6 +31,7 @@ import static org.mybatis.dynamic.sql.SqlBuilder.isLikeCaseInsensitive; import static org.mybatis.dynamic.sql.SqlBuilder.isNotEqualTo; import static org.mybatis.dynamic.sql.SqlBuilder.isNotIn; import static org.mybatis.dynamic.sql.SqlBuilder.max; +import static org.mybatis.dynamic.sql.SqlBuilder.min; import static org.mybatis.dynamic.sql.SqlBuilder.or; import static org.mybatis.dynamic.sql.SqlBuilder.select; import static org.mybatis.dynamic.sql.SqlBuilder.selectDistinct; @@ -63,6 +64,8 @@ import org.apache.kylin.common.logging.LogOutputStream; import org.apache.kylin.common.persistence.metadata.JdbcDataSource; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.mybatis.dynamic.sql.BasicColumn; import org.mybatis.dynamic.sql.SqlBuilder; @@ -71,14 +74,11 @@ import org.mybatis.dynamic.sql.insert.render.InsertStatementProvider; import org.mybatis.dynamic.sql.render.RenderingStrategies; import org.mybatis.dynamic.sql.select.QueryExpressionDSL; import org.mybatis.dynamic.sql.select.SelectModel; -import org.mybatis.dynamic.sql.select.join.EqualTo; import org.mybatis.dynamic.sql.select.aggregate.Count; +import org.mybatis.dynamic.sql.select.join.EqualTo; import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -395,6 +395,17 @@ public class JdbcQueryHistoryStore { } } + protected Long queryQueryHistoryMinQueryTime() { + try (SqlSession session = sqlSessionFactory.openSession()) { + QueryHistoryMapper mapper = session.getMapper(QueryHistoryMapper.class); + SelectStatementProvider statementProvider = select(queryHistoryTable.queryTime) // + .from(queryHistoryTable) // + .where(queryHistoryTable.id, isEqualTo(select(min(queryHistoryTable.id)).from(queryHistoryTable))) + .build().render(RenderingStrategies.MYBATIS3); + return mapper.selectAsLong(statementProvider); + } + } + public QueryStatistics queryRecentQueryCount(long startTime, long endTime, String project) { try (SqlSession session = sqlSessionFactory.openSession()) { QueryStatisticsMapper mapper = session.getMapper(QueryStatisticsMapper.class); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java index 2921098c70..6754cc5ae2 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java @@ -61,6 +61,8 @@ public interface QueryHistoryDAO { QueryHistory getByQueryId(String queryId); + Long getQueryHistoryMinQueryTime(); + List<QueryHistory> getQueryHistoriesSubmitters(QueryHistoryRequest request, int size); List<QueryStatistics> getQueryHistoriesModelIds(QueryHistoryRequest request); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java index a6ed94675f..f70ff1ab7c 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java @@ -109,6 +109,11 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { return jdbcQueryHisStore.queryByQueryId(queryId); } + @Override + public Long getQueryHistoryMinQueryTime() { + return jdbcQueryHisStore.queryQueryHistoryMinQueryTime(); + } + public void deleteQueryHistoriesIfMaxSizeReached() throws InterruptedException { long maxSize = KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize(); long totalCount = jdbcQueryHisStore.getCountOnQueryHistory(); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java index d8b2c05613..82c631c6ca 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java @@ -201,6 +201,11 @@ public class QueryHisStoreUtil { } } + @SneakyThrows + public static Long getQueryHistoryMinQueryTime() { + return getQueryHistoryDao().getQueryHistoryMinQueryTime(); + } + public static void cleanQueryHistory(String projectName, long historyCount) { long projectMaxSize = KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize(); if (historyCount <= projectMaxSize) { diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java index 2842fd111c..8e72346048 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.TimeUtil; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.junit.TimeZoneTestRunner; import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.junit.After; @@ -35,8 +36,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - @RunWith(TimeZoneTestRunner.class) public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { @@ -575,6 +574,17 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { Assert.assertEquals(queryMetrics.id, queryHistory.getId()); } + @Test + public void testGetQueryHistoryMinQueryTime() { + QueryMetrics queryMetrics1 = createQueryMetrics(1580311512000L, 1L, true, PROJECT, true); + QueryMetrics queryMetrics2 = createQueryMetrics(1580397912000L, 2L, false, PROJECT, true); + queryHistoryDAO.insert(queryMetrics1); + queryHistoryDAO.insert(queryMetrics2); + + Long queryHistoryMinQueryTime = QueryHisStoreUtil.getQueryHistoryMinQueryTime(); + Assert.assertEquals(1580311512000L, queryHistoryMinQueryTime.longValue()); + } + @Test public void testGetRetainTime() throws Exception { long retainTime = RDBMSQueryHistoryDAO.getRetainTime(); @@ -883,8 +893,7 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { realizationMetrics.setProjectName(project); realizationMetrics.setModelId("82fa7671-a935-45f5-8779-85703601f49a.json"); - realizationMetrics.setSnapshots( - Lists.newArrayList("DEFAULT.TEST_KYLIN_ACCOUNT", "DEFAULT.TEST_COUNTRY")); + realizationMetrics.setSnapshots(Lists.newArrayList("DEFAULT.TEST_KYLIN_ACCOUNT", "DEFAULT.TEST_COUNTRY")); List<QueryMetrics.RealizationMetrics> realizationMetricsList = Lists.newArrayList(); realizationMetricsList.add(realizationMetrics); diff --git a/src/data-loading-booter/src/main/resources/kylinSecurity.xml b/src/data-loading-booter/src/main/resources/kylinSecurity.xml index 153eb4d978..047be47d8f 100644 --- a/src/data-loading-booter/src/main/resources/kylinSecurity.xml +++ b/src/data-loading-booter/src/main/resources/kylinSecurity.xml @@ -269,6 +269,7 @@ <scr:intercept-url pattern="/api/system/diag/progress" access="permitAll"/> <scr:intercept-url pattern="/api/system/roll_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" access="permitAll"/> + <scr:intercept-url pattern="/api/system/clean_sparder_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" access="permitAll"/> <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/query/history_queries/table_names" access="permitAll"/> diff --git a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java index af489e9933..71d7da9e6c 100644 --- a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java +++ b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java @@ -47,6 +47,7 @@ import org.apache.kylin.common.metrics.MetricsGroup; import org.apache.kylin.common.metrics.MetricsName; import org.apache.kylin.common.response.RestResponse; import org.apache.kylin.common.util.AddressUtil; +import org.apache.kylin.common.util.ClusterConstant; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NamedThreadFactory; import org.apache.kylin.common.util.Pair; @@ -59,11 +60,14 @@ import org.apache.kylin.metadata.resourcegroup.KylinInstance; import org.apache.kylin.metadata.resourcegroup.RequestTypeEnum; import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager; import org.apache.kylin.metadata.resourcegroup.ResourceGroupMappingInfo; +import org.apache.kylin.rest.cluster.ClusterManager; +import org.apache.kylin.rest.response.ServerInfoResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; @@ -80,6 +84,8 @@ public class ScheduleService { private static final String GLOBAL = "global"; + private static final String CLEAN_SPARDER_EVENT_LOG = "http://%s/kylin/api/system/clean_sparder_event_log"; + @Autowired @Qualifier("normalRestTemplate") RestTemplate restTemplate; @@ -96,6 +102,9 @@ public class ScheduleService { @Autowired(required = false) ProjectSmartSupporter projectSmartSupporter; + @Autowired + private ClusterManager clusterManager; + private final ExecutorService executors = Executors .newSingleThreadExecutor(new NamedThreadFactory("RoutineTaskScheduler")); private final ExecutorService asyncExecutors = new ThreadPoolExecutor(20, 20, 30, TimeUnit.MINUTES, @@ -122,6 +131,7 @@ public class ScheduleService { try (SetThreadName ignored = new SetThreadName("RoutineOpsWorker")) { if (epochManager.checkEpochOwner(EpochManager.GLOBAL)) { AtomicReference<Pair<String, String>> backupFolder = new AtomicReference<>(null); + broadcastCleanSparderEventLogToQueryNodes(); executeTask(() -> backupFolder.set(backupService.backupAll()), "MetadataBackup", startTime); executeMetadataBackupInTenantMode(kylinConfig, startTime, backupFolder); executeTask(() -> RoutineToolHelper.cleanQueryHistoriesAsync(getRemainingTime(startTime), @@ -134,6 +144,7 @@ public class ScheduleService { executeTask(() -> projectService.garbageCleanup(getRemainingTime(startTime)), "ProjectGarbageCleanup", startTime); executeTask(RoutineToolHelper::cleanStorageForRoutine, "HdfsCleanup", startTime); + executeTask(() -> RoutineToolHelper.cleanEventLog(true, false, false), "EventLogCleanup", startTime); log.info("Finish to work, cost {}ms", System.currentTimeMillis() - startTime); } } catch (InterruptedException e) { @@ -223,6 +234,43 @@ public class ScheduleService { } } + private void broadcastCleanSparderEventLogToQueryNodes() { + List<ServerInfoResponse> queryNodes = clusterManager.getQueryServers(); + + try { + for (ServerInfoResponse node : queryNodes) { + if (ClusterConstant.ALL.equals(node.getMode())) { + continue; + } + + val url = String.format(Locale.ROOT, CLEAN_SPARDER_EVENT_LOG, node.getHost()); + log.info("Start broadcasting to clean the sparder event log of {}", url); + + val httpHeaders = new HttpHeaders(); + httpHeaders.add(HttpHeaders.CONTENT_TYPE, HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON); + val response = restTemplate.exchange(url, HttpMethod.DELETE, new HttpEntity<>(httpHeaders), + String.class); + receive(response, "noticeToQueryNode"); + } + } catch (Exception e) { + log.error("Broadcast cleaning sparder event log failed!", e); + } + } + + private void receive(ResponseEntity<String> response, String msg) throws IOException { + val responseStatus = response.getStatusCodeValue(); + if (responseStatus != HttpStatus.SC_OK) { + log.error("{} failed, HttpStatus is {}", msg, responseStatus); + } + + val responseBody = Optional.ofNullable(response.getBody()).orElse(""); + val responseJson = JsonUtil.readValue(responseBody, new TypeReference<RestResponse<Boolean>>() { + }); + if (!StringUtils.equals(responseJson.getCode(), KylinException.CODE_SUCCESS)) { + log.error("{} failed, response code is {}", msg, responseJson.getCode()); + } + } + public void broadcastToTenantNode(String resourceGroupId, String backupDir, String tmpFilePath, long tmpFileLength, String host) { try { @@ -236,17 +284,7 @@ public class ScheduleService { httpHeaders.add(HttpHeaders.CONTENT_TYPE, HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON); val exchange = restTemplate.exchange(url, HttpMethod.POST, new HttpEntity<>(JsonUtil.writeValueAsBytes(req), httpHeaders), String.class); - val responseStatus = exchange.getStatusCodeValue(); - if (responseStatus != HttpStatus.SC_OK) { - log.error("noticeToTenantNode failed, HttpStatus is {}", responseStatus); - return; - } - val responseBody = Optional.ofNullable(exchange.getBody()).orElse(""); - val response = JsonUtil.readValue(responseBody, new TypeReference<RestResponse<Boolean>>() { - }); - if (!StringUtils.equals(response.getCode(), KylinException.CODE_SUCCESS)) { - log.error("noticeToTenantNode failed, response code is {}", response.getCode()); - } + receive(exchange, "noticeToTenantNode"); } catch (IOException e) { log.error(e.getMessage(), e); } diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java index 1eb020345f..b217b57cae 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java @@ -27,23 +27,36 @@ import static org.mockito.Mockito.doThrow; import java.io.IOException; import java.util.concurrent.TimeoutException; +import org.apache.kylin.common.response.RestResponse; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; import org.apache.kylin.junit.annotation.OverwriteProp; +import org.apache.kylin.rest.cluster.MockClusterManager; import org.apache.kylin.rest.constant.Constant; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.web.client.RestTemplate; + +import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.kylin.metadata.epoch.EpochManager; import lombok.SneakyThrows; +import lombok.val; +import lombok.var; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -58,17 +71,27 @@ public class ScheduleServiceTest extends NLocalFileMetadataTestCase { @Mock private ScheduleService scheduleService = Mockito.spy(ScheduleService.class); + @Mock + private RestTemplate restTemplate = Mockito.mock(RestTemplate.class); + @Rule public ExpectedException thrown = ExpectedException.none(); @Before - public void setup() { + public void setup() throws JsonProcessingException { overwriteSystemProp("HADOOP_USER_NAME", "root"); createTestMetadata(); SecurityContextHolder.getContext() .setAuthentication(new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN)); ReflectionTestUtils.setField(scheduleService, "projectService", projectService); ReflectionTestUtils.setField(scheduleService, "backupService", backupService); + ReflectionTestUtils.setField(scheduleService, "clusterManager", new MockClusterManager()); + ReflectionTestUtils.setField(scheduleService, "restTemplate", restTemplate); + + val restResult = JsonUtil.writeValueAsString(RestResponse.ok()); + var resp = new ResponseEntity<>(restResult, HttpStatus.OK); + Mockito.doReturn(resp).when(restTemplate).exchange(anyString(), ArgumentMatchers.any(HttpMethod.class), + ArgumentMatchers.any(HttpEntity.class), ArgumentMatchers.<Class<String>> any()); } @After diff --git a/src/query-booter/src/main/resources/kylinSecurity.xml b/src/query-booter/src/main/resources/kylinSecurity.xml index 37dcfc2214..bf3f7aae8e 100644 --- a/src/query-booter/src/main/resources/kylinSecurity.xml +++ b/src/query-booter/src/main/resources/kylinSecurity.xml @@ -269,6 +269,7 @@ <scr:intercept-url pattern="/api/system/diag/progress" access="permitAll"/> <scr:intercept-url pattern="/api/system/roll_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" access="permitAll"/> + <scr:intercept-url pattern="/api/system/clean_sparder_event_log" access="permitAll"/> <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" access="permitAll"/> <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/query/history_queries/table_names" access="permitAll"/> diff --git a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java index 68f0b5c9ae..3e41fa541d 100644 --- a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java +++ b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java @@ -144,6 +144,8 @@ public class QueryNodeFilter extends BaseFilter { notRoutePostApiSet.add("/kylin/api/metastore/cleanup_storage/tenant_node"); notRoutePostApiSet.add("/kylin/api/metastore/cleanup_storage"); + notRouteDeleteApiSet.add("/kylin/api/system/clean_sparder_event_log"); + notRouteDeleteApiSet.add("/kylin/api/async_query/tenant_node"); routeMultiTenantModeFilterApiSet.add("/kylin/api/jobs/{jobId}/resume"); diff --git a/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java b/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java index ad72f1e6b9..f000e766a6 100644 --- a/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java +++ b/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java @@ -62,6 +62,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.guava30.shaded.common.io.ByteSource; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.tool.HDFSMetadataTool; +import org.apache.kylin.tool.constant.StringConstant; import org.apache.kylin.tool.garbage.StorageCleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,7 @@ import lombok.var; */ public class MetadataToolHelper { - public static final DateTimeFormatter DATE_TIME_FORMATTER = HelperConstants.DATE_TIME_FORMATTER; + public static final DateTimeFormatter DATE_TIME_FORMATTER = StringConstant.DATE_TIME_FORMATTER; private static final String GLOBAL = "global"; private static final String HDFS_METADATA_URL_FORMATTER = "kylin_metadata@hdfs,path=%s"; @@ -380,9 +381,9 @@ public class MetadataToolHelper { System.out.println("cleanup HDFS finished"); } catch (Exception e) { logger.error("cleanup HDFS failed", e); - System.out.println(StorageCleaner.ANSI_RED + System.out.println(StringConstant.ANSI_RED + "cleanup HDFS failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" - + StorageCleaner.ANSI_RESET); + + StringConstant.ANSI_RESET); } } diff --git a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java index 9bdbd5a3ff..7492e22e29 100644 --- a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java +++ b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java @@ -36,9 +36,10 @@ import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; import org.apache.kylin.metadata.streaming.util.StreamingJobRecordStoreUtil; import org.apache.kylin.metadata.streaming.util.StreamingJobStatsStoreUtil; +import org.apache.kylin.tool.constant.StringConstant; import org.apache.kylin.tool.garbage.AbstractComparableCleanTask; import org.apache.kylin.tool.garbage.CleanTaskExecutorService; -import org.apache.kylin.tool.garbage.GarbageCleaner; +import org.apache.kylin.tool.garbage.MetadataCleaner; import org.apache.kylin.tool.garbage.PriorityExecutor; import org.apache.kylin.tool.garbage.SourceUsageCleaner; import org.apache.kylin.tool.garbage.StorageCleaner; @@ -80,6 +81,35 @@ public class RoutineToolHelper { TimeUnit.MILLISECONDS); } + public static CompletableFuture<Void> cleanEventLog(boolean cleanUp, boolean cleanCurrentSparder, + boolean cleanAll) { + tryInitCleanTaskExecutorService(); + return CleanTaskExecutorService.getInstance().submit(new AbstractComparableCleanTask() { + @Override + public String getName() { + return "cleanEventLog"; + } + + @Override + protected void doRun() { + StorageCleaner.EventLogCleaner eventLogCleaner = new StorageCleaner.EventLogCleaner(cleanUp); + if (cleanAll) { + eventLogCleaner.cleanAllEventLog(); + } else if (cleanCurrentSparder) { + eventLogCleaner.cleanCurrentSparderEventLog(); + } else { + // clean current sparder event log and spark event log + eventLogCleaner.execute(); + } + } + + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return StorageCleaner.CleanerTag.ROUTINE; + } + }, KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), TimeUnit.MILLISECONDS); + } + public static void cleanStorageForRoutine() { tryInitCleanTaskExecutorService(); CleanTaskExecutorService.getInstance().cleanStorageForRoutine(true, Collections.emptyList(), 0, 0); @@ -117,7 +147,7 @@ public class RoutineToolHelper { log.info("Start to clean up global meta"); try { EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - new SourceUsageCleaner().cleanup(); + new SourceUsageCleaner().execute(); return null; }, UnitOfWork.GLOBAL_UNIT); } catch (Exception e) { @@ -134,7 +164,7 @@ public class RoutineToolHelper { .stream(KylinConfig.getInstanceFromEnv().getProjectsAggressiveOptimizationIndex()) .map(StringUtils::lowerCase).collect(Collectors.toList()) .contains(StringUtils.toRootLowerCase(projectName)); - GarbageCleaner.cleanMetadata(projectName, needAggressiveOpt); + MetadataCleaner.clean(projectName, needAggressiveOpt); } catch (Exception e) { log.error("Project[{}] cleanup Metadata failed", projectName, e); } @@ -153,11 +183,10 @@ public class RoutineToolHelper { System.out.println("Metadata cleanup finished"); } catch (Exception e) { log.error("Metadata cleanup failed", e); - System.out.println(StorageCleaner.ANSI_RED + System.out.println(StringConstant.ANSI_RED + "Metadata cleanup failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" - + StorageCleaner.ANSI_RESET); + + StringConstant.ANSI_RESET); } } - } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java b/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java index b25e8ae603..ba13cd5b82 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java @@ -23,8 +23,6 @@ import static org.apache.kylin.common.exception.code.ErrorCodeTool.PARAMETER_NOT import static org.apache.kylin.common.exception.code.ErrorCodeTool.PARAMETER_TIMESTAMP_NOT_SPECIFY; import static org.apache.kylin.common.exception.code.ErrorCodeTool.PATH_NOT_EXISTS; import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters; -import static org.apache.kylin.tool.garbage.StorageCleaner.ANSI_RED; -import static org.apache.kylin.tool.garbage.StorageCleaner.ANSI_RESET; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -47,24 +45,24 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.Constant; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.persistence.AuditLog; +import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore; import org.apache.kylin.common.util.ExecutableApplication; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.OptionBuilder; import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.common.constant.Constant; -import org.apache.kylin.common.persistence.AuditLog; -import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore; -import org.apache.kylin.common.util.OptionBuilder; -import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.tool.constant.StringConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - import lombok.val; public class AuditLogTool extends ExecutableApplication { @@ -116,7 +114,7 @@ public class AuditLogTool extends ExecutableApplication { val tool = new AuditLogTool(); tool.execute(args); } catch (Exception e) { - System.out.println(ANSI_RED + "Audit log task failed." + ANSI_RESET); + System.out.println(StringConstant.ANSI_RED + "Audit log task failed." + StringConstant.ANSI_RESET); logger.error("fail execute audit log tool: ", e); Unsafe.systemExit(1); } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java b/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java index d6fc66c92f..29ac7bfe78 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java @@ -26,21 +26,20 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.ExecutableApplication; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.AddressUtil; +import org.apache.kylin.common.util.ExecutableApplication; +import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.project.NProjectManager; -import org.apache.kylin.tool.garbage.StorageCleaner; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.tool.constant.StringConstant; import org.apache.kylin.tool.util.ToolMainWrapper; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -67,8 +66,10 @@ public class MaintainModeTool extends ExecutableApplication { private EpochManager epochManager; private boolean hiddenOutput; private boolean forceToTurnOff; + public MaintainModeTool() { } + public MaintainModeTool(String reason) { this.reason = reason; this.hiddenOutput = true; @@ -153,9 +154,9 @@ public class MaintainModeTool extends ExecutableApplication { enterMaintenanceModeWithRetry(config.getTurnMaintainModeRetryTimes(), reason, projects); } catch (Exception e) { log.error("Mark epoch failed", e); - System.out.println(StorageCleaner.ANSI_RED + System.out.println(StringConstant.ANSI_RED + "Turn on maintain mode failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" - + StorageCleaner.ANSI_RESET); + + StringConstant.ANSI_RESET); Unsafe.systemExit(1); } finally { Unsafe.clearProperty(LEADER_RACE_KEY); @@ -219,9 +220,9 @@ public class MaintainModeTool extends ExecutableApplication { exitMaintenanceModeWithRetry(config.getTurnMaintainModeRetryTimes(), null, projects, forceToTurnOff); } catch (Exception e) { log.error("Release epoch failed, try to turn off maintain mode manually.", e); - System.out.println(StorageCleaner.ANSI_RED + System.out.println(StringConstant.ANSI_RED + "Turn off maintain mode failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" - + StorageCleaner.ANSI_RESET); + + StringConstant.ANSI_RESET); throw new IllegalStateException("Turn off maintain mode failed."); } finally { Unsafe.clearProperty(LEADER_RACE_KEY); diff --git a/src/tool/src/main/java/org/apache/kylin/helper/HelperConstants.java b/src/tool/src/main/java/org/apache/kylin/tool/constant/StringConstant.java similarity index 67% rename from src/tool/src/main/java/org/apache/kylin/helper/HelperConstants.java rename to src/tool/src/main/java/org/apache/kylin/tool/constant/StringConstant.java index 9133d37013..0b5036ca3e 100644 --- a/src/tool/src/main/java/org/apache/kylin/helper/HelperConstants.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/constant/StringConstant.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.helper; +package org.apache.kylin.tool.constant; import java.time.format.DateTimeFormatter; import java.util.Locale; @@ -24,10 +24,18 @@ import java.util.Locale; /* * this class is only for removing dependency of kylin-tool module, and should be refactor later */ -class HelperConstants { +public class StringConstant { - private HelperConstants() {} + private StringConstant() { + } - static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss", + public static final String ANSI_RED = "\u001B[31m"; + public static final String ANSI_GREEN = "\u001B[32m"; + public static final String ANSI_YELLOW = "\u001B[33m"; + public static final String ANSI_BLUE = "\u001B[34m"; + + public static final String ANSI_RESET = "\u001B[0m"; + + public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss", Locale.getDefault(Locale.Category.FORMAT)); } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java index da12dd15bb..a75b904c91 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java @@ -37,10 +37,10 @@ import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.exception.KylinRuntimeException; import org.apache.kylin.common.util.DaemonThreadFactory; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.tool.constant.StringConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,9 +125,7 @@ public class CleanTaskExecutorService implements Closeable { storageCleaner.withTag(tag); storageCleaner.withTraceId(traceId); } catch (Exception e) { - LOGGER.error( - "Failed to create storage cleaner for projects: {}. TraceId: {}", projects, traceId, - e); + LOGGER.error("Failed to create storage cleaner for projects: {}. TraceId: {}", projects, traceId, e); } return storageCleaner; } @@ -141,9 +139,9 @@ public class CleanTaskExecutorService implements Closeable { LOGGER.info("HDFS files cleaning task has successfully completed. TraceId: {}", cleaner.getTraceId()); return; } - LOGGER.error(StorageCleaner.ANSI_RED + LOGGER.error(StringConstant.ANSI_RED + "cleanup HDFS failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" - + StorageCleaner.ANSI_RESET + ". TraceId: " + cleaner.getTraceId(), t); + + StringConstant.ANSI_RESET + ". TraceId: " + cleaner.getTraceId(), t); }); return f; } @@ -170,8 +168,8 @@ public class CleanTaskExecutorService implements Closeable { protected void doRun() { try { cleaner.execute(); - } catch (Exception e) { - throw new KylinRuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } }, timeout, timeUnit); diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java index 04313ee7a5..663fffc6c4 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java @@ -37,12 +37,12 @@ public class ExecutableCleaner extends MetadataCleaner { } @Override - public void beforeCleanup() { + public void beforeExecute() { // do nothing } @Override - public void cleanup() { + public void execute() { logger.info("Start to clean executable in project {}", project); @@ -68,7 +68,7 @@ public class ExecutableCleaner extends MetadataCleaner { } @Override - public void afterCleanup() { + public void afterExecute() { // do nothing } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java index 527f25527a..a30661bc37 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java @@ -18,51 +18,6 @@ package org.apache.kylin.tool.garbage; -import java.util.Arrays; -import java.util.List; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.metrics.MetricsCategory; -import org.apache.kylin.common.metrics.MetricsGroup; -import org.apache.kylin.common.metrics.MetricsName; -import org.apache.kylin.common.scheduler.EventBusFactory; -import org.apache.kylin.common.scheduler.SourceUsageUpdateNotifier; -import org.apache.kylin.metadata.project.EnhancedUnitOfWork; -import org.apache.kylin.metadata.project.NProjectManager; - -import lombok.val; - -public class GarbageCleaner { - - private GarbageCleaner() { - } - - /** - * Clean up metadata - * @param project - */ - public static void cleanMetadata(String project, boolean needAggressiveOpt) { - val projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project); - if (projectInstance == null) { - return; - } - - List<MetadataCleaner> cleaners = initCleaners(project, needAggressiveOpt); - cleaners.forEach(MetadataCleaner::prepare); - EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - cleaners.forEach(MetadataCleaner::beforeCleanup); - cleaners.forEach(MetadataCleaner::cleanup); - cleaners.forEach(MetadataCleaner::afterCleanup); - return 0; - }, project); - - EventBusFactory.getInstance().postAsync(new SourceUsageUpdateNotifier()); - MetricsGroup.hostTagCounterInc(MetricsName.METADATA_CLEAN, MetricsCategory.PROJECT, project); - } - - private static List<MetadataCleaner> initCleaners(String project, boolean needAggressiveOpt) { - return Arrays.asList(new SnapshotCleaner(project), new IndexCleaner(project, needAggressiveOpt), - new ExecutableCleaner(project)); - } - +public interface GarbageCleaner { + void execute() throws InterruptedException; } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java index 7036648383..a1f1d7a69d 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java @@ -103,7 +103,7 @@ public class IndexCleaner extends MetadataCleaner { } @Override - public void beforeCleanup() { + public void beforeExecute() { if (MapUtils.isEmpty(needOptAggressivelyModels)) { return; } @@ -113,7 +113,7 @@ public class IndexCleaner extends MetadataCleaner { } @Override - public void cleanup() { + public void execute() { if (MapUtils.isNotEmpty(needOptAggressivelyModels)) { cleanUpIndexAggressively(); } @@ -135,7 +135,7 @@ public class IndexCleaner extends MetadataCleaner { } @Override - public void afterCleanup() { + public void afterExecute() { if (MapUtils.isEmpty(needOptAggressivelyModels)) { return; } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java index 2f96bfa281..4a7f30f7fb 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java @@ -18,7 +18,21 @@ package org.apache.kylin.tool.garbage; -public abstract class MetadataCleaner { +import java.util.Arrays; +import java.util.List; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.metrics.MetricsCategory; +import org.apache.kylin.common.metrics.MetricsGroup; +import org.apache.kylin.common.metrics.MetricsName; +import org.apache.kylin.common.scheduler.EventBusFactory; +import org.apache.kylin.common.scheduler.SourceUsageUpdateNotifier; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.apache.kylin.metadata.project.NProjectManager; + +import lombok.val; + +public abstract class MetadataCleaner implements GarbageCleaner { protected final String project; protected MetadataCleaner(String project) { @@ -26,15 +40,40 @@ public abstract class MetadataCleaner { } // do in transaction - public abstract void beforeCleanup(); + public abstract void beforeExecute(); // do in transaction - public abstract void cleanup(); + @Override + public abstract void execute(); // do in transaction - public abstract void afterCleanup(); + public abstract void afterExecute(); public void prepare() { // default do nothing } + + public static void clean(String project, boolean needAggressiveOpt) { + val projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project); + if (projectInstance == null) { + return; + } + + List<MetadataCleaner> cleaners = initCleaners(project, needAggressiveOpt); + cleaners.forEach(MetadataCleaner::prepare); + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + cleaners.forEach(MetadataCleaner::beforeExecute); + cleaners.forEach(MetadataCleaner::execute); + cleaners.forEach(MetadataCleaner::afterExecute); + return 0; + }, project); + + EventBusFactory.getInstance().postAsync(new SourceUsageUpdateNotifier()); + MetricsGroup.hostTagCounterInc(MetricsName.METADATA_CLEAN, MetricsCategory.PROJECT, project); + } + + private static List<MetadataCleaner> initCleaners(String project, boolean needAggressiveOpt) { + return Arrays.asList(new SnapshotCleaner(project), new IndexCleaner(project, needAggressiveOpt), + new ExecutableCleaner(project)); + } } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java index 3c1d897be8..09a63f334c 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java @@ -44,7 +44,7 @@ public class SnapshotCleaner extends MetadataCleaner { } @Override - public void beforeCleanup() { + public void beforeExecute() { // do nothing } @@ -74,7 +74,7 @@ public class SnapshotCleaner extends MetadataCleaner { } @Override - public void cleanup() { + public void execute() { logger.info("Start to clean snapshot in project {}", project); // remove stale snapshot path from tables NTableMetadataManager tblMgr = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project); @@ -95,7 +95,7 @@ public class SnapshotCleaner extends MetadataCleaner { } @Override - public void afterCleanup() { + public void afterExecute() { // do nothing } } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java index c89e8a5ae4..782f68fa3e 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java @@ -26,12 +26,11 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.sourceusage.SourceUsageManager; import org.apache.kylin.metadata.sourceusage.SourceUsageRecord; -public class SourceUsageCleaner { - - public void cleanup() { +public class SourceUsageCleaner implements GarbageCleaner { + @Override + public void execute() { KylinConfig config = KylinConfig.getInstanceFromEnv(); - long expirationTime = config.getSourceUsageSurvivalTimeThreshold(); SourceUsageManager sourceUsageManager = SourceUsageManager.getInstance(config); diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java index 30d052cf10..0e6ab4a14c 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java @@ -40,6 +40,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,6 +58,12 @@ import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.ShellException; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.guava30.shaded.common.io.ByteSource; +import org.apache.kylin.guava30.shaded.common.util.concurrent.RateLimiter; +import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.metadata.cube.model.LayoutPartition; @@ -65,18 +72,16 @@ import org.apache.kylin.metadata.cube.model.NDataSegDetails; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; +import org.apache.kylin.query.util.ExtractFactory; +import org.apache.kylin.tool.constant.StringConstant; import org.apache.kylin.tool.util.ProjectTemporaryTableCleanerHelper; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - -import org.apache.kylin.guava30.shaded.common.io.ByteSource; -import org.apache.kylin.guava30.shaded.common.util.concurrent.RateLimiter; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -89,13 +94,7 @@ import lombok.val; import lombok.extern.slf4j.Slf4j; @Slf4j -public class StorageCleaner { - public static final String ANSI_RED = "\u001B[31m"; - public static final String ANSI_GREEN = "\u001B[32m"; - public static final String ANSI_YELLOW = "\u001B[33m"; - public static final String ANSI_BLUE = "\u001B[34m"; - public static final String ANSI_RESET = "\u001B[0m"; - +public class StorageCleaner implements GarbageCleaner { private final boolean cleanup; private final boolean timeMachineEnabled; @@ -165,7 +164,8 @@ public class StorageCleaner { private Set<StorageItem> allFileSystems = Sets.newHashSet(); - public void execute() throws Exception { + @Override + public void execute() throws InterruptedException { long start = System.currentTimeMillis(); val config = KylinConfig.getInstanceFromEnv(); long startTime = System.currentTimeMillis(); @@ -195,8 +195,8 @@ public class StorageCleaner { log.debug("start to collect HDFS from {}", allFileSystem.getPath()); try { collectFromHDFS(allFileSystem); - } catch (FileNotFoundException e) { - log.warn("No garbage files collected from {}", allFileSystem.getPath()); + } catch (IOException e) { + log.warn("No garbage files collected from {}", allFileSystem.getPath(), e); } log.debug("folder {} is collected,detailed -> {}", allFileSystem.getPath(), allFileSystems); } @@ -221,8 +221,8 @@ public class StorageCleaner { try { log.debug("start to add item {}", path); addItem(item.getFileSystemDecorator(), path, protectionTime); - } catch (FileNotFoundException e) { - log.warn("{} not found", path); + } catch (IOException e) { + log.warn("{} not found", path, e); } } } @@ -231,21 +231,25 @@ public class StorageCleaner { } public void printConsole(boolean success, long duration) { - System.out.println(ANSI_BLUE + "Kylin 5.0 garbage report: (cleanup=" + cleanup + ")" + ANSI_RESET); + System.out.println(StringConstant.ANSI_BLUE + "Kylin 5.0 garbage report: (cleanup=" + cleanup + ")" + + StringConstant.ANSI_RESET); for (StorageItem item : outdatedItems) { System.out.println(" Storage File: " + item.getPath()); } String jobName = "Storage GC cleanup job "; if (!cleanup) { - System.out.println(ANSI_BLUE + "Dry run mode, no data is deleted." + ANSI_RESET); + System.out.println( + StringConstant.ANSI_BLUE + "Dry run mode, no data is deleted." + StringConstant.ANSI_RESET); jobName = "Storage GC check job "; } if (!success) { - System.out.println(ANSI_RED + jobName + "FAILED." + ANSI_RESET); - System.out.println(ANSI_RED + jobName + "finished in " + duration + " ms." + ANSI_RESET); + System.out.println(StringConstant.ANSI_RED + jobName + "FAILED." + StringConstant.ANSI_RESET); + System.out.println( + StringConstant.ANSI_RED + jobName + "finished in " + duration + " ms." + StringConstant.ANSI_RESET); } else { - System.out.println(ANSI_GREEN + jobName + "SUCCEED." + ANSI_RESET); - System.out.println(ANSI_GREEN + jobName + "finished in " + duration + " ms." + ANSI_RESET); + System.out.println(StringConstant.ANSI_GREEN + jobName + "SUCCEED." + StringConstant.ANSI_RESET); + System.out.println(StringConstant.ANSI_GREEN + jobName + "finished in " + duration + " ms." + + StringConstant.ANSI_RESET); } } @@ -267,7 +271,7 @@ public class StorageCleaner { new ProjectTemporaryTableCleaner(project).execute(); } - public boolean cleanup() throws Exception { + public boolean cleanup() throws InterruptedException { boolean success = true; if (cleanup) { Stats stats = new Stats() { @@ -535,7 +539,7 @@ public class StorageCleaner { return getDataLayoutDir(dataLayout) + "/" + dataPartition.getBucketId(); } - private void collectFromHDFS(StorageItem item) throws Exception { + private void collectFromHDFS(StorageItem item) throws IOException { val projectFolders = item.getFileSystemDecorator().listStatus(new Path(item.getPath()), path -> !path.getName().startsWith("_") && (this.projectNames.isEmpty() || this.projectNames.contains(path.getName()))); @@ -575,7 +579,8 @@ public class StorageCleaner { .forEach(x -> slot.add(new FileTreeNode(x.getPath().getName(), node))); } } - projectNode.getBuckets().addAll(collectMultiPartitions(item, projectNode.getName(), projectNode.getLayouts())); + projectNode.getBuckets() + .addAll(collectMultiPartitions(item, projectNode.getName(), projectNode.getLayouts())); } } @@ -841,4 +846,135 @@ public class StorageCleaner { return !errorItems.isEmpty(); } } + + /** + * Sparder history dir hierarchy is + * + * /${kylin.storage.columnar.spark-conf.spark.eventLog.dir} + * |--/${hostName_port} + * |--/{eventlog_v2_${appid}#${timestamp}} + * | +--/${events_${fileindex}_${appid}_${starttime}_${endtime}} + */ + + @Slf4j + public static class EventLogCleaner { + private static final KylinConfig KYLIN_CONFIG = KylinConfig.getInstanceFromEnv(); + private static final FileSystem fs = HadoopUtil.getWorkingFileSystem(); + private static final String FIRST_EVENT_LOG_FILE_PREFIX = "events_1_"; + private static final String SPARK_EVENTLOG_DIR = "spark.eventLog.dir"; + private long queryExpirationTime; + private long buildExpirationTime; + private final boolean cleanup; + + private void init() { + long eventLogCleanStartTime = System.currentTimeMillis(); + Long minQueryHistoryTime = QueryHisStoreUtil.getQueryHistoryMinQueryTime(); + long cleanTime = eventLogCleanStartTime - KYLIN_CONFIG.getQueryHistorySurvivalThreshold(); + queryExpirationTime = Objects.isNull(minQueryHistoryTime) ? cleanTime + : Math.min(cleanTime, minQueryHistoryTime); + log.info( + "eventLogCleanStartTime is {}, queryHistorySurvivalThreshold is {}ms, minQueryHistoryTime is {}, queryExpirationTime is {}", + eventLogCleanStartTime, KYLIN_CONFIG.getQueryHistorySurvivalThreshold(), minQueryHistoryTime, + queryExpirationTime); + + long earliest = Long.MAX_VALUE; + NProjectManager projectManager = NProjectManager.getInstance(KYLIN_CONFIG); + for (ProjectInstance prj : projectManager.listAllProjects()) { + NExecutableManager executableManager = NExecutableManager.getInstance(KYLIN_CONFIG, prj.getName()); + for (AbstractExecutable executable : executableManager.getAllExecutables()) { + if (executable.getCreateTime() < earliest) { + earliest = executable.getCreateTime(); + } + } + } + + buildExpirationTime = Math.min(eventLogCleanStartTime - KYLIN_CONFIG.getExecutableSurvivalTimeThreshold(), + earliest); + log.info( + "eventLogCleanStartTime is {}, executableSurvivalTimeThreshold is {}ms, earliest executable's createTime is {}, buildExpirationTime is {}", + eventLogCleanStartTime, KYLIN_CONFIG.getExecutableSurvivalTimeThreshold(), earliest, + buildExpirationTime); + } + + public EventLogCleaner(boolean cleanup) { + this.cleanup = cleanup; + init(); + } + + public void execute() { + if (cleanup) { + cleanCurrentSparderEventLog(); + cleanSparkEventLogs(); + } + } + + public void cleanCurrentSparderEventLog() { + log.info("Start to clean sparder event log"); + String currentSparderEvenLogDir = ExtractFactory.create().getSparderEvenLogDir(); + clean(currentSparderEvenLogDir, queryExpirationTime); + log.info("End to clean sparder event log"); + } + + private void cleanSparkEventLogs() { + log.info("Start to clean spark event log"); + String allSparkEventLogDir = KYLIN_CONFIG.getSparkConfigOverride().get(SPARK_EVENTLOG_DIR).trim(); + clean(allSparkEventLogDir, buildExpirationTime); + + EpochManager epochManager = EpochManager.getInstance(); + NProjectManager prjManager = NProjectManager.getInstance(KYLIN_CONFIG); + prjManager.listAllProjects().forEach(project -> { + if (!epochManager.checkEpochOwner(project.getName())) { + return; + } + String sparkEventLogDir = project.getConfig().getExtendedOverrides() + .get("kylin.engine.spark-conf.spark.eventLog.dir"); + if (!StringUtils.isEmpty(sparkEventLogDir)) { + clean(sparkEventLogDir, buildExpirationTime); + } + }); + log.info("End to clean spark event log"); + } + + // for RoutineTool & FastRoutineTool + public void cleanAllEventLog() { + log.info("Start to clean all event log"); + String rootSparderEvenLogDir = KapConfig.wrap(KYLIN_CONFIG).getSparkConf().get(SPARK_EVENTLOG_DIR); + try { + Arrays.stream(fs.listStatus(new Path(rootSparderEvenLogDir))) + .forEach(fileStatus -> clean(fileStatus.getPath().toString(), queryExpirationTime)); + } catch (IOException e) { + log.warn("Failed to clean all sparder event log of [{}]", rootSparderEvenLogDir, e); + } + + cleanSparkEventLogs(); + log.info("End to clean all event log"); + } + + private void clean(String dir, long expirationTime) { + Path path = new Path(dir); + try { + FileStatus[] fileStatuses = fs.listStatus(path); + for (FileStatus fileStatus : fileStatuses) { + deleteEventLogFile(fileStatus, expirationTime); + } + } catch (Exception e) { + log.warn("Failed to clean the event log of [{}]", path.getName(), e); + } + } + + private void deleteEventLogFile(FileStatus fileStatus, long expirationTime) throws IOException { + if (fileStatus.getModificationTime() <= expirationTime + && !fileStatus.getPath().getName().startsWith(FIRST_EVENT_LOG_FILE_PREFIX)) { + log.info("Delete event log file: {}", fileStatus.getPath().toString()); + fs.delete(fileStatus.getPath(), true); + return; + } + + if (fileStatus.isFile()) { + return; + } + + clean(fileStatus.getPath().toString(), expirationTime); + } + } } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java index a9ef41094c..b9950c2d35 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java @@ -53,6 +53,7 @@ public class FastRoutineTool extends RoutineTool { } System.out.println("Start to fast cleanup hdfs"); cleanStorage(); + cleanEventLogs(); } catch (Exception e) { log.error("Failed to execute fast routintool", e); } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java index 0de65f1780..01c05f76c6 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.commons.lang.ArrayUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ExecutableApplication; import org.apache.kylin.common.util.OptionsHelper; @@ -33,8 +34,8 @@ import org.apache.kylin.helper.RoutineToolHelper; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.tool.MaintainModeTool; +import org.apache.kylin.tool.constant.StringConstant; import org.apache.kylin.tool.garbage.CleanTaskExecutorService; -import org.apache.kylin.tool.garbage.StorageCleaner; import org.apache.kylin.tool.util.ToolMainWrapper; import org.apache.kylin.metadata.epoch.EpochManager; @@ -132,6 +133,7 @@ public class RoutineTool extends ExecutableApplication { RoutineToolHelper.cleanMeta(projectsToCleanup); } cleanStorage(); + cleanEventLogs(); } catch (Exception e) { log.error("Failed to execute routintool", e); throw e; @@ -141,13 +143,30 @@ public class RoutineTool extends ExecutableApplication { public void cleanStorage() { try { System.out.println("Start to cleanup HDFS"); - CleanTaskExecutorService.getInstance().cleanStorageForRoutine( - storageCleanup, Arrays.asList(projects), requestFSRate, retryTimes); + CleanTaskExecutorService.getInstance().cleanStorageForRoutine(storageCleanup, Arrays.asList(projects), + requestFSRate, retryTimes); System.out.println("cleanup HDFS finished"); } catch (Exception e) { - System.out.println(StorageCleaner.ANSI_RED - + "cleanup HDFS failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" - + StorageCleaner.ANSI_RESET); + System.out.println(StringConstant.ANSI_RED + + "cleanup HDFS failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" + + StringConstant.ANSI_RESET); + } + } + + protected void cleanEventLogs() { + try { + if (!ArrayUtils.isEmpty(projects)) { + System.out.println("The event log will not be cleaned when the projects is specified"); + return; + } + + System.out.println("Start to clean all event logs"); + RoutineToolHelper.cleanEventLog(storageCleanup, true, true); + System.out.println("Clean all event logs finished"); + } catch (Exception e) { + System.out.println(StringConstant.ANSI_RED + + "Clean all event logs failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" + + StringConstant.ANSI_RESET); } } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java b/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java index d1eec35e5b..eb5edfb5e7 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java @@ -28,20 +28,20 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.common.util.RandomUtil; -import org.apache.kylin.rest.constant.Constant; -import org.apache.kylin.util.PasswordEncodeFactory; import org.apache.kylin.common.persistence.metadata.PersistException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.io.ByteSource; import org.apache.kylin.metadata.user.ManagedUser; import org.apache.kylin.metadata.user.NKylinUserManager; -import org.apache.kylin.tool.garbage.StorageCleaner; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.tool.constant.StringConstant; +import org.apache.kylin.util.PasswordEncodeFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.guava30.shaded.common.io.ByteSource; import lombok.val; public class AdminUserInitCLI { @@ -100,13 +100,13 @@ public class AdminUserInitCLI { ByteSource.wrap(JsonUtil.writeValueAsBytes(managedUser)), System.currentTimeMillis(), 0L); metaStore.putResource(rawResource, null, UnitOfWork.DEFAULT_EPOCH_ID); - String blackColorUsernameForPrint = StorageCleaner.ANSI_RESET + ADMIN_USER_NAME + StorageCleaner.ANSI_RED; - String blackColorPasswordForPrint = StorageCleaner.ANSI_RESET + password + StorageCleaner.ANSI_RED; + String blackColorUsernameForPrint = StringConstant.ANSI_RESET + ADMIN_USER_NAME + StringConstant.ANSI_RED; + String blackColorPasswordForPrint = StringConstant.ANSI_RESET + password + StringConstant.ANSI_RED; String info = String.format(Locale.ROOT, "Create default user finished. The username of initialized user is [%s], which password is [%s].\n" + "Please keep the password properly. And if you forget the password, you can reset it according to user manual.", blackColorUsernameForPrint, blackColorPasswordForPrint); - System.out.println(StorageCleaner.ANSI_RED + info + StorageCleaner.ANSI_RESET); + System.out.println(StringConstant.ANSI_RED + info + StringConstant.ANSI_RESET); } catch (PersistException e) { logger.warn("{} user has been created on another node.", ADMIN_USER_NAME); } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java b/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java index 0ebcd2d8c2..7c186cfdde 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java @@ -22,20 +22,20 @@ import java.util.Locale; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.util.PasswordEncodeFactory; import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.io.ByteSource; import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.user.NKylinUserManager; import org.apache.kylin.tool.MaintainModeTool; import org.apache.kylin.tool.MetadataTool; -import org.apache.kylin.tool.garbage.StorageCleaner; +import org.apache.kylin.tool.constant.StringConstant; +import org.apache.kylin.util.PasswordEncodeFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.crypto.password.PasswordEncoder; -import org.apache.kylin.guava30.shaded.common.io.ByteSource; import lombok.val; public class KylinPasswordResetCLI { @@ -105,16 +105,16 @@ public class KylinPasswordResetCLI { } if (randomPasswordEnabled) { - String blackColorUsernameForPrint = StorageCleaner.ANSI_RESET + AdminUserInitCLI.ADMIN_USER_NAME - + StorageCleaner.ANSI_RED; - String blackColorPasswordForPrint = StorageCleaner.ANSI_RESET + password + StorageCleaner.ANSI_RED; + String blackColorUsernameForPrint = StringConstant.ANSI_RESET + AdminUserInitCLI.ADMIN_USER_NAME + + StringConstant.ANSI_RED; + String blackColorPasswordForPrint = StringConstant.ANSI_RESET + password + StringConstant.ANSI_RED; String info = String.format(Locale.ROOT, "Reset password of [%s] succeed. The password is [%s].\n" + "Please keep the password properly.", blackColorUsernameForPrint, blackColorPasswordForPrint); - System.out.println(StorageCleaner.ANSI_RED + info + StorageCleaner.ANSI_RESET); + System.out.println(StringConstant.ANSI_RED + info + StringConstant.ANSI_RESET); } else { System.out.println( - StorageCleaner.ANSI_YELLOW + "Reset the ADMIN password successfully." + StorageCleaner.ANSI_RESET); + StringConstant.ANSI_YELLOW + "Reset the ADMIN password successfully." + StringConstant.ANSI_RESET); } return true; diff --git a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java index 48a4403e00..af10c230ad 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java @@ -17,8 +17,13 @@ */ package org.apache.kylin.tool; +import static org.apache.kylin.common.KylinConfigBase.WRITING_CLUSTER_WORKING_DIR; + import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.FileTime; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -33,33 +38,35 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.apache.kylin.tool.garbage.StorageCleaner; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; - -import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import lombok.val; +import lombok.var; import lombok.extern.slf4j.Slf4j; -import static org.apache.kylin.common.KylinConfigBase.WRITING_CLUSTER_WORKING_DIR; - @Slf4j public class StorageCleanerTest extends NLocalFileMetadataTestCase { @@ -93,6 +100,51 @@ public class StorageCleanerTest extends NLocalFileMetadataTestCase { } } + @Test + public void testEventLogClean() throws IOException { + prepareForEventLogClean(); + String allSparderEventLogDir = (KapConfig.wrap(getTestConfig()).getSparkConf().get("spark.eventLog.dir")) + .replace("file:", ""); + String currentSparderEventLogDir = (KapConfig.wrap(getTestConfig()).getSparkConf().get("spark.eventLog.dir") + + "/" + AddressUtil.getLocalServerInfo() + "/eventlog_v2_application_1677899901295_4823#1690192675042") + .replace("file:", ""); + String sparkEventLogDir = getTestConfig().getSparkConfigOverride().get("spark.eventLog.dir").replace("file:", + ""); + + try (MockedStatic<QueryHisStoreUtil> qhsuMocked = Mockito.mockStatic(QueryHisStoreUtil.class)) { + qhsuMocked.when(QueryHisStoreUtil::getQueryHistoryMinQueryTime).thenReturn(null); + + int fileSize = new File(currentSparderEventLogDir).listFiles().length; + Assert.assertEquals(3, fileSize); + var cleaner = new StorageCleaner.EventLogCleaner(false); + cleaner.cleanCurrentSparderEventLog(); + fileSize = new File(currentSparderEventLogDir).listFiles().length; + Assert.assertEquals(2, fileSize); + Assert.assertTrue( + new File(currentSparderEventLogDir + "/events_1_application_1677899901295_8490_1690953331329") + .exists()); + + int sparkEventLogFileSize = new File(sparkEventLogDir).listFiles().length; + Assert.assertEquals(5, sparkEventLogFileSize); + cleaner.execute(); + Assert.assertEquals(5, sparkEventLogFileSize); + + cleaner = new StorageCleaner.EventLogCleaner(true); + cleaner.execute(); + sparkEventLogFileSize = new File(sparkEventLogDir).listFiles().length; + Assert.assertEquals(4, sparkEventLogFileSize); + Assert.assertFalse(new File(sparkEventLogDir + "/application_1677899901295_8243").exists()); + + File otherSparderEventLogFile = new File( + allSparderEventLogDir + "/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380"); + Assert.assertTrue(otherSparderEventLogFile.exists()); + cleaner.cleanAllEventLog(); + otherSparderEventLogFile = new File( + allSparderEventLogDir + "/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380"); + Assert.assertFalse(otherSparderEventLogFile.exists()); + } + } + @Test public void testCleanupAfterTruncate() throws Exception { val cleaner = new StorageCleaner(); @@ -339,6 +391,68 @@ public class StorageCleanerTest extends NLocalFileMetadataTestCase { execMgr.addJob(job2); } + private void prepareForEventLogClean() throws IOException { + KylinConfig config = getTestConfig(); + String currentSparderEventLogDir = (KapConfig.wrap(config).getSparkConf().get("spark.eventLog.dir") + "/" + + AddressUtil.getLocalServerInfo()).replace("file:", ""); + Files.createDirectories(Paths.get(currentSparderEventLogDir)); + + String allSparderEventLogDir = (KapConfig.wrap(config).getSparkConf().get("spark.eventLog.dir")) + .replace("file:", ""); + String sparkEventLogDir = (config.getSparkConfigOverride().get("spark.eventLog.dir")).replace("file:", ""); + + FileUtils.copyDirectory(new File("src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070"), + new File(currentSparderEventLogDir)); + FileUtils.copyDirectoryToDirectory( + new File("src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7071"), + new File(allSparderEventLogDir)); + FileUtils.copyFileToDirectory( + new File("src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_0989"), + new File(sparkEventLogDir)); + FileUtils.copyFileToDirectory( + new File("src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_8243"), + new File(sparkEventLogDir)); + + long currentTime = System.currentTimeMillis(); + long twoMonth = 2 * 30 * 24 * 60 * 60 * 1000L; + long oneDay = 24 * 60 * 60 * 1000L; + long expired = currentTime - twoMonth; + long notExpired = currentTime - oneDay; + + // Not expired + updateLastModified(currentSparderEventLogDir + "/eventlog_v2_application_1677899901295_4823#1690192675042", + notExpired); + + // Expired + updateLastModified(currentSparderEventLogDir + "/eventlog_v2_application_1677899901295_4823#1690192675042/" + + "appstatus_application_1677899901295_8490", expired); + + // Expired + updateLastModified(currentSparderEventLogDir + "/eventlog_v2_application_1677899901295_4823#1690192675042/" + + "events_1_application_1677899901295_8490_1690953331329", expired); + + // Not expired + updateLastModified(currentSparderEventLogDir + "/eventlog_v2_application_1677899901295_4823#1690192675042/" + + "events_2_application_1677899901295_8490_1690953331329", notExpired); + + // Expired + updateLastModified( + allSparderEventLogDir + "/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380", + expired); + + // Expired: Less than the creation time of the earliest executable + updateLastModified(sparkEventLogDir + "/application_1677899901295_8243", 974600451000L); + + // Not expired + updateLastModified(sparkEventLogDir + "/application_1677899901295_0989", notExpired); + } + + public void updateLastModified(String file, long timeStamp) throws IOException { + java.nio.file.Path path = Paths.get(file); + FileTime newLastModifiedTime = FileTime.fromMillis(timeStamp); + Files.setLastModifiedTime(path, newLastModifiedTime); + } + private Set<String> normalizeGarbages(Set<StorageCleaner.StorageItem> items) { return items.stream().map(i -> i.getPath().replaceAll("file:", "").replaceAll("/keep", "")) .collect(Collectors.toSet()); diff --git a/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java index 1b0e368eca..f65a9d2715 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java @@ -72,7 +72,7 @@ class CleanTaskExecutorServiceTests extends NLocalFileMetadataTestCase { } @Override - public void execute() throws Exception { + public void execute() throws InterruptedException { if (runnable != null) { runnable.run(); } diff --git a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java index 0181f4026b..9d9474eadd 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java @@ -18,12 +18,12 @@ package org.apache.kylin.tool.garbage; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.dao.NExecutableDao; import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -54,7 +54,7 @@ public class ExecutableCleanerTest extends NLocalFileMetadataTestCase { createUnexpiredJob(jobId); Assert.assertEquals(1, manager.getJobs().size()); manager.discardJob(jobId); - new ExecutableCleaner(DEFAULT_PROJECT).cleanup(); + new ExecutableCleaner(DEFAULT_PROJECT).execute(); Assert.assertEquals(1, manager.getJobs().size()); } @@ -62,7 +62,7 @@ public class ExecutableCleanerTest extends NLocalFileMetadataTestCase { public void testCleanupWithRunningJob() { createExpiredJob(RandomUtil.randomUUIDStr()); Assert.assertEquals(1, manager.getJobs().size()); - new ExecutableCleaner(DEFAULT_PROJECT).cleanup(); + new ExecutableCleaner(DEFAULT_PROJECT).execute(); Assert.assertEquals(1, manager.getJobs().size()); } @@ -72,7 +72,7 @@ public class ExecutableCleanerTest extends NLocalFileMetadataTestCase { createExpiredJob(jobId); manager.discardJob(jobId); Assert.assertEquals(1, manager.getJobs().size()); - new ExecutableCleaner(DEFAULT_PROJECT).cleanup(); + new ExecutableCleaner(DEFAULT_PROJECT).execute(); Assert.assertEquals(0, manager.getJobs().size()); } diff --git a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java index 90fd332091..0031051487 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java @@ -79,7 +79,7 @@ public class SnapshotCleanerTest extends NLocalFileMetadataTestCase { SnapshotCleaner snapshotCleaner = new SnapshotCleaner(DEFAULT_PROJECT); snapshotCleaner.prepare(); UnitOfWork.doInTransactionWithRetry(() -> { - snapshotCleaner.cleanup(); + snapshotCleaner.execute(); return 0; }, DEFAULT_PROJECT); diff --git a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java index 983bc26d58..3f0dbe5d82 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java @@ -58,7 +58,7 @@ public class SourceUsageCleanerTest extends NLocalFileMetadataTestCase { manager.updateSourceUsage(record); List<SourceUsageRecord> allRecords = manager.getAllRecords(); Assert.assertEquals(1, allRecords.size()); - sourceUsageCleaner.cleanup(); + sourceUsageCleaner.execute(); allRecords = manager.getAllRecords(); Assert.assertEquals(1, allRecords.size()); } @@ -74,7 +74,7 @@ public class SourceUsageCleanerTest extends NLocalFileMetadataTestCase { manager.updateSourceUsage(record1); List<SourceUsageRecord> allRecords = manager.getAllRecords(); Assert.assertEquals(2, allRecords.size()); - sourceUsageCleaner.cleanup(); + sourceUsageCleaner.execute(); allRecords = manager.getAllRecords(); Assert.assertEquals(1, allRecords.size()); Assert.assertEquals(1, allRecords.get(0).getCreateTime()); @@ -87,7 +87,7 @@ public class SourceUsageCleanerTest extends NLocalFileMetadataTestCase { manager.updateSourceUsage(record); List<SourceUsageRecord> allRecords = manager.getAllRecords(); Assert.assertEquals(1, allRecords.size()); - sourceUsageCleaner.cleanup(); + sourceUsageCleaner.execute(); allRecords = manager.getAllRecords(); Assert.assertEquals(1, allRecords.size()); } @@ -95,7 +95,7 @@ public class SourceUsageCleanerTest extends NLocalFileMetadataTestCase { @Test public void testCleanupZeroSourceUsage() { List<SourceUsageRecord> allRecords = manager.getAllRecords(); - sourceUsageCleaner.cleanup(); + sourceUsageCleaner.execute(); Assert.assertEquals(0, allRecords.size()); } diff --git a/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java b/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java index 4551646ccb..62b43c2ce6 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java @@ -26,7 +26,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.user.NKylinUserManager; -import org.apache.kylin.tool.garbage.StorageCleaner; +import org.apache.kylin.tool.constant.StringConstant; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -72,13 +72,13 @@ public class AdminUserInitCLITest extends NLocalFileMetadataTestCase { // assert output on console Assert.assertTrue(output.toString(Charset.defaultCharset().name()) - .startsWith(StorageCleaner.ANSI_RED + .startsWith(StringConstant.ANSI_RED + "Create default user finished. The username of initialized user is [" - + StorageCleaner.ANSI_RESET + "ADMIN" + StorageCleaner.ANSI_RED + "], which password is ")); + + StringConstant.ANSI_RESET + "ADMIN" + StringConstant.ANSI_RED + "], which password is ")); Assert.assertTrue(output.toString(Charset.defaultCharset().name()) .endsWith("Please keep the password properly. " + "And if you forget the password, you can reset it according to user manual." - + StorageCleaner.ANSI_RESET + "\n")); + + StringConstant.ANSI_RESET + "\n")); System.setOut(System.out); diff --git a/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java b/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java index e3c17825cc..6bf3445289 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java @@ -29,10 +29,10 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.metadata.jdbc.AuditLogRowMapper; import org.apache.kylin.common.util.LogOutputTestCase; -import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.metadata.user.ManagedUser; import org.apache.kylin.metadata.user.NKylinUserManager; -import org.apache.kylin.tool.garbage.StorageCleaner; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.tool.constant.StringConstant; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -95,10 +95,10 @@ public class KylinPasswordResetCLITest extends LogOutputTestCase { Assert.assertFalse(pwdEncoder.matches("KYLIN", afterManager.get(user.getUsername()).getPassword())); Assert.assertTrue(output.toString(Charset.defaultCharset().name()).startsWith("The metadata backup path is")); Assert.assertTrue(output.toString(Charset.defaultCharset().name()) - .contains(StorageCleaner.ANSI_RED + "Reset password of [" + StorageCleaner.ANSI_RESET + "ADMIN" - + StorageCleaner.ANSI_RED + "] succeed. The password is ")); + .contains(StringConstant.ANSI_RED + "Reset password of [" + StringConstant.ANSI_RESET + "ADMIN" + + StringConstant.ANSI_RED + "] succeed. The password is ")); Assert.assertTrue(output.toString(Charset.defaultCharset().name()) - .endsWith("Please keep the password properly." + StorageCleaner.ANSI_RESET + "\n")); + .endsWith("Please keep the password properly." + StringConstant.ANSI_RESET + "\n")); val url = getTestConfig().getMetadataUrl(); val jdbcTemplate = getJdbcTemplate(); diff --git a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/appstatus_application_1677899901295_8490 b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/appstatus_application_1677899901295_8490 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_1_application_1677899901295_8490_1690953331329 b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_1_application_1677899901295_8490_1690953331329 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_2_application_1677899901295_8490_1690953331329 b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_2_application_1677899901295_8490_1690953331329 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380/events_1_application_1677899901295_5936_1690235831577 b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380/events_1_application_1677899901295_5936_1690235831577 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_0989 b/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_0989 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_8243 b/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_8243 new file mode 100644 index 0000000000..e69de29bb2