This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ca370d2dad5855941ebb699c1ff3d3eb343414f6
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Sun Aug 13 19:57:05 2023 +0800

    KYLIN-5789 Clean sparder history and spark history automatically
---
 .../src/main/resources/kylinSecurity.xml           |   1 +
 .../kylin/rest/controller/NSystemController.java   |  18 +-
 .../src/main/resources/kylinSecurity.xml           |   1 +
 .../apache/kylin/rest/service/ProjectService.java  |   6 +-
 .../apache/kylin/rest/service/SystemService.java   |   9 +
 .../src/test/resources/kylinSecurity.xml           |   1 +
 .../metadata/query/JdbcQueryHistoryStore.java      |  19 ++-
 .../kylin/metadata/query/QueryHistoryDAO.java      |   2 +
 .../kylin/metadata/query/RDBMSQueryHistoryDAO.java |   5 +
 .../metadata/query/util/QueryHisStoreUtil.java     |   5 +
 .../metadata/query/RDBMSQueryHistoryDaoTest.java   |  17 +-
 .../src/main/resources/kylinSecurity.xml           |   1 +
 .../apache/kylin/rest/service/ScheduleService.java |  60 +++++--
 .../kylin/rest/service/ScheduleServiceTest.java    |  25 ++-
 .../src/main/resources/kylinSecurity.xml           |   1 +
 .../org/apache/kylin/rest/QueryNodeFilter.java     |   2 +
 .../apache/kylin/helper/MetadataToolHelper.java    |   7 +-
 .../org/apache/kylin/helper/RoutineToolHelper.java |  41 ++++-
 .../java/org/apache/kylin/tool/AuditLogTool.java   |  18 +-
 .../org/apache/kylin/tool/MaintainModeTool.java    |  21 +--
 .../constant/StringConstant.java}                  |  16 +-
 .../tool/garbage/CleanTaskExecutorService.java     |  14 +-
 .../kylin/tool/garbage/ExecutableCleaner.java      |   6 +-
 .../apache/kylin/tool/garbage/GarbageCleaner.java  |  49 +-----
 .../apache/kylin/tool/garbage/IndexCleaner.java    |   6 +-
 .../apache/kylin/tool/garbage/MetadataCleaner.java |  47 ++++-
 .../apache/kylin/tool/garbage/SnapshotCleaner.java |   6 +-
 .../kylin/tool/garbage/SourceUsageCleaner.java     |   7 +-
 .../apache/kylin/tool/garbage/StorageCleaner.java  | 190 ++++++++++++++++++---
 .../apache/kylin/tool/routine/FastRoutineTool.java |   1 +
 .../org/apache/kylin/tool/routine/RoutineTool.java |  31 +++-
 .../kylin/tool/security/AdminUserInitCLI.java      |  18 +-
 .../kylin/tool/security/KylinPasswordResetCLI.java |  18 +-
 .../org/apache/kylin/tool/StorageCleanerTest.java  | 126 +++++++++++++-
 .../garbage/CleanTaskExecutorServiceTests.java     |   2 +-
 .../kylin/tool/garbage/ExecutableCleanerTest.java  |   8 +-
 .../kylin/tool/garbage/SnapshotCleanerTest.java    |   2 +-
 .../kylin/tool/garbage/SourceUsageCleanerTest.java |   8 +-
 .../kylin/tool/security/AdminUserInitCLITest.java  |   8 +-
 .../tool/security/KylinPasswordResetCLITest.java   |  10 +-
 .../appstatus_application_1677899901295_8490       |   0
 ..._1_application_1677899901295_8490_1690953331329 |   0
 ..._2_application_1677899901295_8490_1690953331329 |   0
 ..._1_application_1677899901295_5936_1690235831577 |   0
 .../spark-history/application_1677899901295_0989   |   0
 .../spark-history/application_1677899901295_8243   |   0
 46 files changed, 619 insertions(+), 214 deletions(-)

diff --git a/src/common-booter/src/main/resources/kylinSecurity.xml 
b/src/common-booter/src/main/resources/kylinSecurity.xml
index 37dcfc2214..bf3f7aae8e 100644
--- a/src/common-booter/src/main/resources/kylinSecurity.xml
+++ b/src/common-booter/src/main/resources/kylinSecurity.xml
@@ -269,6 +269,7 @@
             <scr:intercept-url pattern="/api/system/diag/progress" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/roll_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" 
access="permitAll"/>
+            <scr:intercept-url pattern="/api/system/clean_sparder_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/user/authentication*/**" 
access="permitAll"/>
             <scr:intercept-url 
pattern="/api/query/history_queries/table_names" access="permitAll"/>
diff --git 
a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java
 
b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java
index a8cdcda4c2..b29d4a4eaf 100644
--- 
a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java
+++ 
b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NSystemController.java
@@ -43,6 +43,7 @@ import 
org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
 import org.apache.kylin.helper.MetadataToolHelper;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -79,8 +80,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-
 import io.swagger.annotations.ApiOperation;
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
@@ -148,7 +147,8 @@ public class NSystemController extends NBasicController {
             diagPackageRequest.setStart("");
             diagPackageRequest.setEnd("");
         } else {
-            if (StringUtils.isBlank(diagPackageRequest.getStart()) || 
StringUtils.isBlank(diagPackageRequest.getEnd())) {
+            if (StringUtils.isBlank(diagPackageRequest.getStart())
+                    || StringUtils.isBlank(diagPackageRequest.getEnd())) {
                 throw new KylinException(TIME_INVALID_RANGE_NOT_CONSISTENT);
             }
         }
@@ -210,7 +210,7 @@ public class NSystemController extends NBasicController {
             return systemService.getExtractorStatus(id, project);
         } else {
             String url = host + "/kylin/api/system/diag/status?id=" + id;
-            if(StringUtils.isNotEmpty(project)){
+            if (StringUtils.isNotEmpty(project)) {
                 url = url + "&project=" + project;
             }
             return generateTaskForRemoteHost(request, url);
@@ -229,7 +229,7 @@ public class NSystemController extends NBasicController {
                     response);
         } else {
             String url = host + "/kylin/api/system/diag?id=" + id;
-            if(StringUtils.isNotEmpty(project)){
+            if (StringUtils.isNotEmpty(project)) {
                 url = url + "&project=" + project;
             }
             downloadFromRemoteHost(request, url, response);
@@ -284,11 +284,9 @@ public class NSystemController extends NBasicController {
         val servers = clusterManager.getServers();
         response.setStatus(maintenanceModeService.getMaintenanceMode());
         if (ext) {
-            response.setServers(
-                servers.stream().map(server ->
-                    new ServerExtInfoResponse()
-                    .setServer(server)
-                    
.setSecretName(encodeHost(server.getHost()))).collect(Collectors.toList()));
+            response.setServers(servers.stream().map(
+                    server -> new 
ServerExtInfoResponse().setServer(server).setSecretName(encodeHost(server.getHost())))
+                    .collect(Collectors.toList()));
         } else {
             
response.setServers(servers.stream().map(ServerInfoResponse::getHost).collect(Collectors.toList()));
         }
diff --git a/src/common-server/src/main/resources/kylinSecurity.xml 
b/src/common-server/src/main/resources/kylinSecurity.xml
index aa6e2154a8..313a14afe0 100644
--- a/src/common-server/src/main/resources/kylinSecurity.xml
+++ b/src/common-server/src/main/resources/kylinSecurity.xml
@@ -269,6 +269,7 @@
             <scr:intercept-url pattern="/api/system/diag/progress" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/roll_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" 
access="permitAll"/>
+            <scr:intercept-url pattern="/api/system/clean_sparder_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/user/authentication*/**" 
access="permitAll"/>
             <scr:intercept-url 
pattern="/api/query/history_queries/table_names" access="permitAll"/>
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index aafde56676..4299f0d7e3 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -128,7 +128,7 @@ import org.apache.kylin.rest.security.AclPermissionEnum;
 import org.apache.kylin.rest.security.KerberosLoginManager;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.streaming.manager.StreamingJobManager;
-import org.apache.kylin.tool.garbage.GarbageCleaner;
+import org.apache.kylin.tool.garbage.MetadataCleaner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -361,7 +361,7 @@ public class ProjectService extends BasicService {
                     boolean needAggressiveOpt = 
Arrays.stream(config.getProjectsAggressiveOptimizationIndex())
                             
.map(StringUtils::lowerCase).collect(Collectors.toList())
                             
.contains(StringUtils.toRootLowerCase(project.getName()));
-                    GarbageCleaner.cleanMetadata(project.getName(), 
needAggressiveOpt);
+                    MetadataCleaner.clean(project.getName(), 
needAggressiveOpt);
                     EventBusFactory.getInstance().callService(new 
ProjectCleanOldQueryResultEvent(project.getName()));
                 } catch (Exception e) {
                     logger.warn("clean project<" + project.getName() + "> 
failed", e);
@@ -412,7 +412,7 @@ public class ProjectService extends BasicService {
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or 
hasPermission(#project, 'ADMINISTRATION')")
     public void cleanupGarbage(String project, boolean needAggressiveOpt) 
throws Exception {
         projectSmartService.cleanupGarbage(project, 0);
-        GarbageCleaner.cleanMetadata(project, needAggressiveOpt);
+        MetadataCleaner.clean(project, needAggressiveOpt);
         asyncTaskService.cleanupStorage();
     }
 
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
index 24505237e9..e983bcb46c 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
@@ -55,6 +55,7 @@ import org.apache.kylin.common.util.BufferedLogger;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.StringHelper;
 import org.apache.kylin.helper.MetadataToolHelper;
+import org.apache.kylin.helper.RoutineToolHelper;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.NExecutableManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
@@ -386,4 +387,12 @@ public class SystemService extends BasicService {
         }
         return result;
     }
+
+    public void cleanSparderEventLog() {
+        val config = KylinConfig.getInstanceFromEnv();
+        if (config.isQueryNodeOnly()) {
+            logger.info("Clean current sparder event log for RPC");
+            RoutineToolHelper.cleanEventLog(true, true, false);
+        }
+    }
 }
diff --git a/src/common-service/src/test/resources/kylinSecurity.xml 
b/src/common-service/src/test/resources/kylinSecurity.xml
index 1db0da1bc3..c8bdfc9937 100644
--- a/src/common-service/src/test/resources/kylinSecurity.xml
+++ b/src/common-service/src/test/resources/kylinSecurity.xml
@@ -261,6 +261,7 @@
             <scr:intercept-url pattern="/api/system/diag/progress" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/roll_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" 
access="permitAll"/>
+            <scr:intercept-url pattern="/api/system/clean_sparder_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/user/authentication*/**" 
access="permitAll"/>
             <scr:intercept-url 
pattern="/api/query/history_queries/table_names" access="permitAll"/>
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
index 6ffbd9eac0..e35aae28f5 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
@@ -31,6 +31,7 @@ import static 
org.mybatis.dynamic.sql.SqlBuilder.isLikeCaseInsensitive;
 import static org.mybatis.dynamic.sql.SqlBuilder.isNotEqualTo;
 import static org.mybatis.dynamic.sql.SqlBuilder.isNotIn;
 import static org.mybatis.dynamic.sql.SqlBuilder.max;
+import static org.mybatis.dynamic.sql.SqlBuilder.min;
 import static org.mybatis.dynamic.sql.SqlBuilder.or;
 import static org.mybatis.dynamic.sql.SqlBuilder.select;
 import static org.mybatis.dynamic.sql.SqlBuilder.selectDistinct;
@@ -63,6 +64,8 @@ import org.apache.kylin.common.logging.LogOutputStream;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.mybatis.dynamic.sql.BasicColumn;
 import org.mybatis.dynamic.sql.SqlBuilder;
@@ -71,14 +74,11 @@ import 
org.mybatis.dynamic.sql.insert.render.InsertStatementProvider;
 import org.mybatis.dynamic.sql.render.RenderingStrategies;
 import org.mybatis.dynamic.sql.select.QueryExpressionDSL;
 import org.mybatis.dynamic.sql.select.SelectModel;
-import org.mybatis.dynamic.sql.select.join.EqualTo;
 import org.mybatis.dynamic.sql.select.aggregate.Count;
+import org.mybatis.dynamic.sql.select.join.EqualTo;
 import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
 import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
 
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -395,6 +395,17 @@ public class JdbcQueryHistoryStore {
         }
     }
 
+    protected Long queryQueryHistoryMinQueryTime() {
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
+            SelectStatementProvider statementProvider = 
select(queryHistoryTable.queryTime) //
+                    .from(queryHistoryTable) //
+                    .where(queryHistoryTable.id, 
isEqualTo(select(min(queryHistoryTable.id)).from(queryHistoryTable)))
+                    .build().render(RenderingStrategies.MYBATIS3);
+            return mapper.selectAsLong(statementProvider);
+        }
+    }
+
     public QueryStatistics queryRecentQueryCount(long startTime, long endTime, 
String project) {
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryStatisticsMapper mapper = 
session.getMapper(QueryStatisticsMapper.class);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
index 2921098c70..6754cc5ae2 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
@@ -61,6 +61,8 @@ public interface QueryHistoryDAO {
 
     QueryHistory getByQueryId(String queryId);
 
+    Long getQueryHistoryMinQueryTime();
+
     List<QueryHistory> getQueryHistoriesSubmitters(QueryHistoryRequest 
request, int size);
 
     List<QueryStatistics> getQueryHistoriesModelIds(QueryHistoryRequest 
request);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
index a6ed94675f..f70ff1ab7c 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
@@ -109,6 +109,11 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
         return jdbcQueryHisStore.queryByQueryId(queryId);
     }
 
+    @Override
+    public Long getQueryHistoryMinQueryTime() {
+        return jdbcQueryHisStore.queryQueryHistoryMinQueryTime();
+    }
+
     public void deleteQueryHistoriesIfMaxSizeReached() throws 
InterruptedException {
         long maxSize = 
KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize();
         long totalCount = jdbcQueryHisStore.getCountOnQueryHistory();
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
index d8b2c05613..82c631c6ca 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
@@ -201,6 +201,11 @@ public class QueryHisStoreUtil {
         }
     }
 
+    @SneakyThrows
+    public static Long getQueryHistoryMinQueryTime() {
+        return getQueryHistoryDao().getQueryHistoryMinQueryTime();
+    }
+
     public static void cleanQueryHistory(String projectName, long 
historyCount) {
         long projectMaxSize = 
KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize();
         if (historyCount <= projectMaxSize) {
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
index 2842fd111c..8e72346048 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.junit.TimeZoneTestRunner;
 import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.junit.After;
@@ -35,8 +36,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 @RunWith(TimeZoneTestRunner.class)
 public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase {
 
@@ -575,6 +574,17 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(queryMetrics.id, queryHistory.getId());
     }
 
+    @Test
+    public void testGetQueryHistoryMinQueryTime() {
+        QueryMetrics queryMetrics1 = createQueryMetrics(1580311512000L, 1L, 
true, PROJECT, true);
+        QueryMetrics queryMetrics2 = createQueryMetrics(1580397912000L, 2L, 
false, PROJECT, true);
+        queryHistoryDAO.insert(queryMetrics1);
+        queryHistoryDAO.insert(queryMetrics2);
+
+        Long queryHistoryMinQueryTime = 
QueryHisStoreUtil.getQueryHistoryMinQueryTime();
+        Assert.assertEquals(1580311512000L, 
queryHistoryMinQueryTime.longValue());
+    }
+
     @Test
     public void testGetRetainTime() throws Exception {
         long retainTime = RDBMSQueryHistoryDAO.getRetainTime();
@@ -883,8 +893,7 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
             realizationMetrics.setProjectName(project);
             
realizationMetrics.setModelId("82fa7671-a935-45f5-8779-85703601f49a.json");
 
-            realizationMetrics.setSnapshots(
-                    Lists.newArrayList("DEFAULT.TEST_KYLIN_ACCOUNT", 
"DEFAULT.TEST_COUNTRY"));
+            
realizationMetrics.setSnapshots(Lists.newArrayList("DEFAULT.TEST_KYLIN_ACCOUNT",
 "DEFAULT.TEST_COUNTRY"));
 
             List<QueryMetrics.RealizationMetrics> realizationMetricsList = 
Lists.newArrayList();
             realizationMetricsList.add(realizationMetrics);
diff --git a/src/data-loading-booter/src/main/resources/kylinSecurity.xml 
b/src/data-loading-booter/src/main/resources/kylinSecurity.xml
index 153eb4d978..047be47d8f 100644
--- a/src/data-loading-booter/src/main/resources/kylinSecurity.xml
+++ b/src/data-loading-booter/src/main/resources/kylinSecurity.xml
@@ -269,6 +269,7 @@
             <scr:intercept-url pattern="/api/system/diag/progress" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/roll_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" 
access="permitAll"/>
+            <scr:intercept-url pattern="/api/system/clean_sparder_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/user/authentication*/**" 
access="permitAll"/>
             <scr:intercept-url 
pattern="/api/query/history_queries/table_names" access="permitAll"/>
diff --git 
a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
 
b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
index af489e9933..71d7da9e6c 100644
--- 
a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
+++ 
b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
@@ -47,6 +47,7 @@ import org.apache.kylin.common.metrics.MetricsGroup;
 import org.apache.kylin.common.metrics.MetricsName;
 import org.apache.kylin.common.response.RestResponse;
 import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.common.util.ClusterConstant;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
 import org.apache.kylin.common.util.Pair;
@@ -59,11 +60,14 @@ import 
org.apache.kylin.metadata.resourcegroup.KylinInstance;
 import org.apache.kylin.metadata.resourcegroup.RequestTypeEnum;
 import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager;
 import org.apache.kylin.metadata.resourcegroup.ResourceGroupMappingInfo;
+import org.apache.kylin.rest.cluster.ClusterManager;
+import org.apache.kylin.rest.response.ServerInfoResponse;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
@@ -80,6 +84,8 @@ public class ScheduleService {
 
     private static final String GLOBAL = "global";
 
+    private static final String CLEAN_SPARDER_EVENT_LOG = 
"http://%s/kylin/api/system/clean_sparder_event_log";;
+
     @Autowired
     @Qualifier("normalRestTemplate")
     RestTemplate restTemplate;
@@ -96,6 +102,9 @@ public class ScheduleService {
     @Autowired(required = false)
     ProjectSmartSupporter projectSmartSupporter;
 
+    @Autowired
+    private ClusterManager clusterManager;
+
     private final ExecutorService executors = Executors
             .newSingleThreadExecutor(new 
NamedThreadFactory("RoutineTaskScheduler"));
     private final ExecutorService asyncExecutors = new ThreadPoolExecutor(20, 
20, 30, TimeUnit.MINUTES,
@@ -122,6 +131,7 @@ public class ScheduleService {
             try (SetThreadName ignored = new 
SetThreadName("RoutineOpsWorker")) {
                 if (epochManager.checkEpochOwner(EpochManager.GLOBAL)) {
                     AtomicReference<Pair<String, String>> backupFolder = new 
AtomicReference<>(null);
+                    broadcastCleanSparderEventLogToQueryNodes();
                     executeTask(() -> 
backupFolder.set(backupService.backupAll()), "MetadataBackup", startTime);
                     executeMetadataBackupInTenantMode(kylinConfig, startTime, 
backupFolder);
                     executeTask(() -> 
RoutineToolHelper.cleanQueryHistoriesAsync(getRemainingTime(startTime),
@@ -134,6 +144,7 @@ public class ScheduleService {
                 executeTask(() -> 
projectService.garbageCleanup(getRemainingTime(startTime)), 
"ProjectGarbageCleanup",
                         startTime);
                 executeTask(RoutineToolHelper::cleanStorageForRoutine, 
"HdfsCleanup", startTime);
+                executeTask(() -> RoutineToolHelper.cleanEventLog(true, false, 
false), "EventLogCleanup", startTime);
                 log.info("Finish to work, cost {}ms", 
System.currentTimeMillis() - startTime);
             }
         } catch (InterruptedException e) {
@@ -223,6 +234,43 @@ public class ScheduleService {
         }
     }
 
+    private void broadcastCleanSparderEventLogToQueryNodes() {
+        List<ServerInfoResponse> queryNodes = clusterManager.getQueryServers();
+
+        try {
+            for (ServerInfoResponse node : queryNodes) {
+                if (ClusterConstant.ALL.equals(node.getMode())) {
+                    continue;
+                }
+
+                val url = String.format(Locale.ROOT, CLEAN_SPARDER_EVENT_LOG, 
node.getHost());
+                log.info("Start broadcasting to clean the sparder event log of 
{}", url);
+
+                val httpHeaders = new HttpHeaders();
+                httpHeaders.add(HttpHeaders.CONTENT_TYPE, 
HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON);
+                val response = restTemplate.exchange(url, HttpMethod.DELETE, 
new HttpEntity<>(httpHeaders),
+                        String.class);
+                receive(response, "noticeToQueryNode");
+            }
+        } catch (Exception e) {
+            log.error("Broadcast cleaning sparder event log failed!", e);
+        }
+    }
+
+    private void receive(ResponseEntity<String> response, String msg) throws 
IOException {
+        val responseStatus = response.getStatusCodeValue();
+        if (responseStatus != HttpStatus.SC_OK) {
+            log.error("{} failed, HttpStatus is {}", msg, responseStatus);
+        }
+
+        val responseBody = Optional.ofNullable(response.getBody()).orElse("");
+        val responseJson = JsonUtil.readValue(responseBody, new 
TypeReference<RestResponse<Boolean>>() {
+        });
+        if (!StringUtils.equals(responseJson.getCode(), 
KylinException.CODE_SUCCESS)) {
+            log.error("{} failed, response code is {}", msg, 
responseJson.getCode());
+        }
+    }
+
     public void broadcastToTenantNode(String resourceGroupId, String 
backupDir, String tmpFilePath, long tmpFileLength,
             String host) {
         try {
@@ -236,17 +284,7 @@ public class ScheduleService {
             httpHeaders.add(HttpHeaders.CONTENT_TYPE, 
HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON);
             val exchange = restTemplate.exchange(url, HttpMethod.POST,
                     new HttpEntity<>(JsonUtil.writeValueAsBytes(req), 
httpHeaders), String.class);
-            val responseStatus = exchange.getStatusCodeValue();
-            if (responseStatus != HttpStatus.SC_OK) {
-                log.error("noticeToTenantNode failed, HttpStatus is {}", 
responseStatus);
-                return;
-            }
-            val responseBody = 
Optional.ofNullable(exchange.getBody()).orElse("");
-            val response = JsonUtil.readValue(responseBody, new 
TypeReference<RestResponse<Boolean>>() {
-            });
-            if (!StringUtils.equals(response.getCode(), 
KylinException.CODE_SUCCESS)) {
-                log.error("noticeToTenantNode failed, response code is {}", 
response.getCode());
-            }
+            receive(exchange, "noticeToTenantNode");
         } catch (IOException e) {
             log.error(e.getMessage(), e);
         }
diff --git 
a/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java
 
b/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java
index 1eb020345f..b217b57cae 100644
--- 
a/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java
+++ 
b/src/job-service/src/test/java/org/apache/kylin/rest/service/ScheduleServiceTest.java
@@ -27,23 +27,36 @@ import static org.mockito.Mockito.doThrow;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.kylin.common.response.RestResponse;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.junit.annotation.OverwriteProp;
+import org.apache.kylin.rest.cluster.MockClusterManager;
 import org.apache.kylin.rest.constant.Constant;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
 import org.springframework.security.authentication.TestingAuthenticationToken;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.util.ReflectionTestUtils;
+import org.springframework.web.client.RestTemplate;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
 
 import org.apache.kylin.metadata.epoch.EpochManager;
 import lombok.SneakyThrows;
+import lombok.val;
+import lombok.var;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -58,17 +71,27 @@ public class ScheduleServiceTest extends 
NLocalFileMetadataTestCase {
     @Mock
     private ScheduleService scheduleService = 
Mockito.spy(ScheduleService.class);
 
+    @Mock
+    private RestTemplate restTemplate = Mockito.mock(RestTemplate.class);
+
     @Rule
     public ExpectedException thrown = ExpectedException.none();
 
     @Before
-    public void setup() {
+    public void setup() throws JsonProcessingException {
         overwriteSystemProp("HADOOP_USER_NAME", "root");
         createTestMetadata();
         SecurityContextHolder.getContext()
                 .setAuthentication(new TestingAuthenticationToken("ADMIN", 
"ADMIN", Constant.ROLE_ADMIN));
         ReflectionTestUtils.setField(scheduleService, "projectService", 
projectService);
         ReflectionTestUtils.setField(scheduleService, "backupService", 
backupService);
+        ReflectionTestUtils.setField(scheduleService, "clusterManager", new 
MockClusterManager());
+        ReflectionTestUtils.setField(scheduleService, "restTemplate", 
restTemplate);
+
+        val restResult = JsonUtil.writeValueAsString(RestResponse.ok());
+        var resp = new ResponseEntity<>(restResult, HttpStatus.OK);
+        Mockito.doReturn(resp).when(restTemplate).exchange(anyString(), 
ArgumentMatchers.any(HttpMethod.class),
+                ArgumentMatchers.any(HttpEntity.class), 
ArgumentMatchers.<Class<String>> any());
     }
 
     @After
diff --git a/src/query-booter/src/main/resources/kylinSecurity.xml 
b/src/query-booter/src/main/resources/kylinSecurity.xml
index 37dcfc2214..bf3f7aae8e 100644
--- a/src/query-booter/src/main/resources/kylinSecurity.xml
+++ b/src/query-booter/src/main/resources/kylinSecurity.xml
@@ -269,6 +269,7 @@
             <scr:intercept-url pattern="/api/system/diag/progress" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/roll_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/broadcast_metadata_backup" 
access="permitAll"/>
+            <scr:intercept-url pattern="/api/system/clean_sparder_event_log" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/system/metadata_backup_tmp_file" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/user/authentication*/**" 
access="permitAll"/>
             <scr:intercept-url 
pattern="/api/query/history_queries/table_names" access="permitAll"/>
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java 
b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java
index 68f0b5c9ae..3e41fa541d 100644
--- a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java
+++ b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java
@@ -144,6 +144,8 @@ public class QueryNodeFilter extends BaseFilter {
         
notRoutePostApiSet.add("/kylin/api/metastore/cleanup_storage/tenant_node");
         notRoutePostApiSet.add("/kylin/api/metastore/cleanup_storage");
 
+        notRouteDeleteApiSet.add("/kylin/api/system/clean_sparder_event_log");
+
         notRouteDeleteApiSet.add("/kylin/api/async_query/tenant_node");
 
         routeMultiTenantModeFilterApiSet.add("/kylin/api/jobs/{jobId}/resume");
diff --git 
a/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java 
b/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java
index ad72f1e6b9..f000e766a6 100644
--- a/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java
+++ b/src/tool/src/main/java/org/apache/kylin/helper/MetadataToolHelper.java
@@ -62,6 +62,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.tool.HDFSMetadataTool;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.apache.kylin.tool.garbage.StorageCleaner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,7 +75,7 @@ import lombok.var;
 */
 public class MetadataToolHelper {
 
-    public static final DateTimeFormatter DATE_TIME_FORMATTER = 
HelperConstants.DATE_TIME_FORMATTER;
+    public static final DateTimeFormatter DATE_TIME_FORMATTER = 
StringConstant.DATE_TIME_FORMATTER;
     private static final String GLOBAL = "global";
     private static final String HDFS_METADATA_URL_FORMATTER = 
"kylin_metadata@hdfs,path=%s";
 
@@ -380,9 +381,9 @@ public class MetadataToolHelper {
             System.out.println("cleanup HDFS finished");
         } catch (Exception e) {
             logger.error("cleanup HDFS failed", e);
-            System.out.println(StorageCleaner.ANSI_RED
+            System.out.println(StringConstant.ANSI_RED
                     + "cleanup HDFS failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
-                    + StorageCleaner.ANSI_RESET);
+                    + StringConstant.ANSI_RESET);
         }
     }
 
diff --git 
a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java 
b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
index 9bdbd5a3ff..7492e22e29 100644
--- a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
+++ b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
@@ -36,9 +36,10 @@ import 
org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.metadata.streaming.util.StreamingJobRecordStoreUtil;
 import org.apache.kylin.metadata.streaming.util.StreamingJobStatsStoreUtil;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.apache.kylin.tool.garbage.AbstractComparableCleanTask;
 import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
-import org.apache.kylin.tool.garbage.GarbageCleaner;
+import org.apache.kylin.tool.garbage.MetadataCleaner;
 import org.apache.kylin.tool.garbage.PriorityExecutor;
 import org.apache.kylin.tool.garbage.SourceUsageCleaner;
 import org.apache.kylin.tool.garbage.StorageCleaner;
@@ -80,6 +81,35 @@ public class RoutineToolHelper {
                 TimeUnit.MILLISECONDS);
     }
 
+    public static CompletableFuture<Void> cleanEventLog(boolean cleanUp, 
boolean cleanCurrentSparder,
+            boolean cleanAll) {
+        tryInitCleanTaskExecutorService();
+        return CleanTaskExecutorService.getInstance().submit(new 
AbstractComparableCleanTask() {
+            @Override
+            public String getName() {
+                return "cleanEventLog";
+            }
+
+            @Override
+            protected void doRun() {
+                StorageCleaner.EventLogCleaner eventLogCleaner = new 
StorageCleaner.EventLogCleaner(cleanUp);
+                if (cleanAll) {
+                    eventLogCleaner.cleanAllEventLog();
+                } else if (cleanCurrentSparder) {
+                    eventLogCleaner.cleanCurrentSparderEventLog();
+                } else {
+                    // clean current sparder event log and spark event log
+                    eventLogCleaner.execute();
+                }
+            }
+
+            @Override
+            public StorageCleaner.CleanerTag getCleanerTag() {
+                return StorageCleaner.CleanerTag.ROUTINE;
+            }
+        }, KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), 
TimeUnit.MILLISECONDS);
+    }
+
     public static void cleanStorageForRoutine() {
         tryInitCleanTaskExecutorService();
         CleanTaskExecutorService.getInstance().cleanStorageForRoutine(true, 
Collections.emptyList(), 0, 0);
@@ -117,7 +147,7 @@ public class RoutineToolHelper {
         log.info("Start to clean up global meta");
         try {
             EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-                new SourceUsageCleaner().cleanup();
+                new SourceUsageCleaner().execute();
                 return null;
             }, UnitOfWork.GLOBAL_UNIT);
         } catch (Exception e) {
@@ -134,7 +164,7 @@ public class RoutineToolHelper {
                     
.stream(KylinConfig.getInstanceFromEnv().getProjectsAggressiveOptimizationIndex())
                     .map(StringUtils::lowerCase).collect(Collectors.toList())
                     .contains(StringUtils.toRootLowerCase(projectName));
-            GarbageCleaner.cleanMetadata(projectName, needAggressiveOpt);
+            MetadataCleaner.clean(projectName, needAggressiveOpt);
         } catch (Exception e) {
             log.error("Project[{}] cleanup Metadata failed", projectName, e);
         }
@@ -153,11 +183,10 @@ public class RoutineToolHelper {
             System.out.println("Metadata cleanup finished");
         } catch (Exception e) {
             log.error("Metadata cleanup failed", e);
-            System.out.println(StorageCleaner.ANSI_RED
+            System.out.println(StringConstant.ANSI_RED
                     + "Metadata cleanup failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
-                    + StorageCleaner.ANSI_RESET);
+                    + StringConstant.ANSI_RESET);
         }
 
     }
-
 }
diff --git a/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java
index b25e8ae603..ba13cd5b82 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/AuditLogTool.java
@@ -23,8 +23,6 @@ import static 
org.apache.kylin.common.exception.code.ErrorCodeTool.PARAMETER_NOT
 import static 
org.apache.kylin.common.exception.code.ErrorCodeTool.PARAMETER_TIMESTAMP_NOT_SPECIFY;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeTool.PATH_NOT_EXISTS;
 import static 
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters;
-import static org.apache.kylin.tool.garbage.StorageCleaner.ANSI_RED;
-import static org.apache.kylin.tool.garbage.StorageCleaner.ANSI_RESET;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -47,24 +45,24 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.dbcp2.BasicDataSourceFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.Constant;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.persistence.AuditLog;
+import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore;
 import org.apache.kylin.common.util.ExecutableApplication;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.OptionBuilder;
 import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.common.constant.Constant;
-import org.apache.kylin.common.persistence.AuditLog;
-import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore;
-import org.apache.kylin.common.util.OptionBuilder;
-import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 import lombok.val;
 
 public class AuditLogTool extends ExecutableApplication {
@@ -116,7 +114,7 @@ public class AuditLogTool extends ExecutableApplication {
             val tool = new AuditLogTool();
             tool.execute(args);
         } catch (Exception e) {
-            System.out.println(ANSI_RED + "Audit log task failed." + 
ANSI_RESET);
+            System.out.println(StringConstant.ANSI_RED + "Audit log task 
failed." + StringConstant.ANSI_RESET);
             logger.error("fail execute audit log tool: ", e);
             Unsafe.systemExit(1);
         }
diff --git a/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java
index d6fc66c92f..29ac7bfe78 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/MaintainModeTool.java
@@ -26,21 +26,20 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ExecutableApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.common.util.ExecutableApplication;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.tool.garbage.StorageCleaner;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.apache.kylin.tool.util.ToolMainWrapper;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -67,8 +66,10 @@ public class MaintainModeTool extends ExecutableApplication {
     private EpochManager epochManager;
     private boolean hiddenOutput;
     private boolean forceToTurnOff;
+
     public MaintainModeTool() {
     }
+
     public MaintainModeTool(String reason) {
         this.reason = reason;
         this.hiddenOutput = true;
@@ -153,9 +154,9 @@ public class MaintainModeTool extends ExecutableApplication 
{
             
enterMaintenanceModeWithRetry(config.getTurnMaintainModeRetryTimes(), reason, 
projects);
         } catch (Exception e) {
             log.error("Mark epoch failed", e);
-            System.out.println(StorageCleaner.ANSI_RED
+            System.out.println(StringConstant.ANSI_RED
                     + "Turn on maintain mode failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
-                    + StorageCleaner.ANSI_RESET);
+                    + StringConstant.ANSI_RESET);
             Unsafe.systemExit(1);
         } finally {
             Unsafe.clearProperty(LEADER_RACE_KEY);
@@ -219,9 +220,9 @@ public class MaintainModeTool extends ExecutableApplication 
{
             
exitMaintenanceModeWithRetry(config.getTurnMaintainModeRetryTimes(), null, 
projects, forceToTurnOff);
         } catch (Exception e) {
             log.error("Release epoch failed, try to turn off maintain mode 
manually.", e);
-            System.out.println(StorageCleaner.ANSI_RED
+            System.out.println(StringConstant.ANSI_RED
                     + "Turn off maintain mode failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
-                    + StorageCleaner.ANSI_RESET);
+                    + StringConstant.ANSI_RESET);
             throw new IllegalStateException("Turn off maintain mode failed.");
         } finally {
             Unsafe.clearProperty(LEADER_RACE_KEY);
diff --git 
a/src/tool/src/main/java/org/apache/kylin/helper/HelperConstants.java 
b/src/tool/src/main/java/org/apache/kylin/tool/constant/StringConstant.java
similarity index 67%
rename from src/tool/src/main/java/org/apache/kylin/helper/HelperConstants.java
rename to 
src/tool/src/main/java/org/apache/kylin/tool/constant/StringConstant.java
index 9133d37013..0b5036ca3e 100644
--- a/src/tool/src/main/java/org/apache/kylin/helper/HelperConstants.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/constant/StringConstant.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.helper;
+package org.apache.kylin.tool.constant;
 
 import java.time.format.DateTimeFormatter;
 import java.util.Locale;
@@ -24,10 +24,18 @@ import java.util.Locale;
 /*
  * this class is only for removing dependency of kylin-tool module, and should 
be refactor later
  */
-class HelperConstants {
+public class StringConstant {
 
-    private HelperConstants() {}
+    private StringConstant() {
+    }
 
-    static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss",
+    public static final String ANSI_RED = "\u001B[31m";
+    public static final String ANSI_GREEN = "\u001B[32m";
+    public static final String ANSI_YELLOW = "\u001B[33m";
+    public static final String ANSI_BLUE = "\u001B[34m";
+
+    public static final String ANSI_RESET = "\u001B[0m";
+
+    public static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss",
             Locale.getDefault(Locale.Category.FORMAT));
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java
 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java
index da12dd15bb..a75b904c91 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java
@@ -37,10 +37,10 @@ import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,9 +125,7 @@ public class CleanTaskExecutorService implements Closeable {
             storageCleaner.withTag(tag);
             storageCleaner.withTraceId(traceId);
         } catch (Exception e) {
-            LOGGER.error(
-                    "Failed to create storage cleaner for projects: {}. 
TraceId: {}", projects, traceId,
-                    e);
+            LOGGER.error("Failed to create storage cleaner for projects: {}. 
TraceId: {}", projects, traceId, e);
         }
         return storageCleaner;
     }
@@ -141,9 +139,9 @@ public class CleanTaskExecutorService implements Closeable {
                 LOGGER.info("HDFS files cleaning task has successfully 
completed. TraceId: {}", cleaner.getTraceId());
                 return;
             }
-            LOGGER.error(StorageCleaner.ANSI_RED
+            LOGGER.error(StringConstant.ANSI_RED
                     + "cleanup HDFS failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
-                    + StorageCleaner.ANSI_RESET + ". TraceId: " + 
cleaner.getTraceId(), t);
+                    + StringConstant.ANSI_RESET + ". TraceId: " + 
cleaner.getTraceId(), t);
         });
         return f;
     }
@@ -170,8 +168,8 @@ public class CleanTaskExecutorService implements Closeable {
             protected void doRun() {
                 try {
                     cleaner.execute();
-                } catch (Exception e) {
-                    throw new KylinRuntimeException(e);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
             }
         }, timeout, timeUnit);
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java
index 04313ee7a5..663fffc6c4 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java
@@ -37,12 +37,12 @@ public class ExecutableCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void beforeCleanup() {
+    public void beforeExecute() {
         // do nothing
     }
 
     @Override
-    public void cleanup() {
+    public void execute() {
 
         logger.info("Start to clean executable in project {}", project);
 
@@ -68,7 +68,7 @@ public class ExecutableCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void afterCleanup() {
+    public void afterExecute() {
         // do nothing
     }
 
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java
index 527f25527a..a30661bc37 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java
@@ -18,51 +18,6 @@
 
 package org.apache.kylin.tool.garbage;
 
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.metrics.MetricsCategory;
-import org.apache.kylin.common.metrics.MetricsGroup;
-import org.apache.kylin.common.metrics.MetricsName;
-import org.apache.kylin.common.scheduler.EventBusFactory;
-import org.apache.kylin.common.scheduler.SourceUsageUpdateNotifier;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.apache.kylin.metadata.project.NProjectManager;
-
-import lombok.val;
-
-public class GarbageCleaner {
-
-    private GarbageCleaner() {
-    }
-
-    /**
-     * Clean up metadata
-     * @param project
-     */
-    public static void cleanMetadata(String project, boolean 
needAggressiveOpt) {
-        val projectInstance = 
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project);
-        if (projectInstance == null) {
-            return;
-        }
-
-        List<MetadataCleaner> cleaners = initCleaners(project, 
needAggressiveOpt);
-        cleaners.forEach(MetadataCleaner::prepare);
-        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            cleaners.forEach(MetadataCleaner::beforeCleanup);
-            cleaners.forEach(MetadataCleaner::cleanup);
-            cleaners.forEach(MetadataCleaner::afterCleanup);
-            return 0;
-        }, project);
-
-        EventBusFactory.getInstance().postAsync(new 
SourceUsageUpdateNotifier());
-        MetricsGroup.hostTagCounterInc(MetricsName.METADATA_CLEAN, 
MetricsCategory.PROJECT, project);
-    }
-
-    private static List<MetadataCleaner> initCleaners(String project, boolean 
needAggressiveOpt) {
-        return Arrays.asList(new SnapshotCleaner(project), new 
IndexCleaner(project, needAggressiveOpt),
-                new ExecutableCleaner(project));
-    }
-
+public interface GarbageCleaner {
+    void execute() throws InterruptedException;
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java
index 7036648383..a1f1d7a69d 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java
@@ -103,7 +103,7 @@ public class IndexCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void beforeCleanup() {
+    public void beforeExecute() {
         if (MapUtils.isEmpty(needOptAggressivelyModels)) {
             return;
         }
@@ -113,7 +113,7 @@ public class IndexCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void cleanup() {
+    public void execute() {
         if (MapUtils.isNotEmpty(needOptAggressivelyModels)) {
             cleanUpIndexAggressively();
         }
@@ -135,7 +135,7 @@ public class IndexCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void afterCleanup() {
+    public void afterExecute() {
         if (MapUtils.isEmpty(needOptAggressivelyModels)) {
             return;
         }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
index 2f96bfa281..4a7f30f7fb 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
@@ -18,7 +18,21 @@
 
 package org.apache.kylin.tool.garbage;
 
-public abstract class MetadataCleaner {
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.metrics.MetricsCategory;
+import org.apache.kylin.common.metrics.MetricsGroup;
+import org.apache.kylin.common.metrics.MetricsName;
+import org.apache.kylin.common.scheduler.EventBusFactory;
+import org.apache.kylin.common.scheduler.SourceUsageUpdateNotifier;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.project.NProjectManager;
+
+import lombok.val;
+
+public abstract class MetadataCleaner implements GarbageCleaner {
     protected final String project;
 
     protected MetadataCleaner(String project) {
@@ -26,15 +40,40 @@ public abstract class MetadataCleaner {
     }
 
     // do in transaction
-    public abstract void beforeCleanup();
+    public abstract void beforeExecute();
 
     // do in transaction
-    public abstract void cleanup();
+    @Override
+    public abstract void execute();
 
     // do in transaction
-    public abstract void afterCleanup();
+    public abstract void afterExecute();
 
     public void prepare() {
         // default do nothing
     }
+
+    public static void clean(String project, boolean needAggressiveOpt) {
+        val projectInstance = 
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project);
+        if (projectInstance == null) {
+            return;
+        }
+
+        List<MetadataCleaner> cleaners = initCleaners(project, 
needAggressiveOpt);
+        cleaners.forEach(MetadataCleaner::prepare);
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            cleaners.forEach(MetadataCleaner::beforeExecute);
+            cleaners.forEach(MetadataCleaner::execute);
+            cleaners.forEach(MetadataCleaner::afterExecute);
+            return 0;
+        }, project);
+
+        EventBusFactory.getInstance().postAsync(new 
SourceUsageUpdateNotifier());
+        MetricsGroup.hostTagCounterInc(MetricsName.METADATA_CLEAN, 
MetricsCategory.PROJECT, project);
+    }
+
+    private static List<MetadataCleaner> initCleaners(String project, boolean 
needAggressiveOpt) {
+        return Arrays.asList(new SnapshotCleaner(project), new 
IndexCleaner(project, needAggressiveOpt),
+                new ExecutableCleaner(project));
+    }
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java
index 3c1d897be8..09a63f334c 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java
@@ -44,7 +44,7 @@ public class SnapshotCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void beforeCleanup() {
+    public void beforeExecute() {
         // do nothing
     }
 
@@ -74,7 +74,7 @@ public class SnapshotCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void cleanup() {
+    public void execute() {
         logger.info("Start to clean snapshot in project {}", project);
         // remove stale snapshot path from tables
         NTableMetadataManager tblMgr = 
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
@@ -95,7 +95,7 @@ public class SnapshotCleaner extends MetadataCleaner {
     }
 
     @Override
-    public void afterCleanup() {
+    public void afterExecute() {
         // do nothing
     }
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java
index c89e8a5ae4..782f68fa3e 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SourceUsageCleaner.java
@@ -26,12 +26,11 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
 import org.apache.kylin.metadata.sourceusage.SourceUsageRecord;
 
-public class SourceUsageCleaner {
-
-    public void cleanup() {
+public class SourceUsageCleaner implements GarbageCleaner {
 
+    @Override
+    public void execute() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-
         long expirationTime = config.getSourceUsageSurvivalTimeThreshold();
 
         SourceUsageManager sourceUsageManager = 
SourceUsageManager.getInstance(config);
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
index 30d052cf10..0e6ab4a14c 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +58,12 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.ShellException;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.guava30.shaded.common.io.ByteSource;
+import org.apache.kylin.guava30.shaded.common.util.concurrent.RateLimiter;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NExecutableManager;
 import org.apache.kylin.metadata.cube.model.LayoutPartition;
@@ -65,18 +72,16 @@ import org.apache.kylin.metadata.cube.model.NDataSegDetails;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
+import org.apache.kylin.query.util.ExtractFactory;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.apache.kylin.tool.util.ProjectTemporaryTableCleanerHelper;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
-import org.apache.kylin.guava30.shaded.common.io.ByteSource;
-import org.apache.kylin.guava30.shaded.common.util.concurrent.RateLimiter;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -89,13 +94,7 @@ import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class StorageCleaner {
-    public static final String ANSI_RED = "\u001B[31m";
-    public static final String ANSI_GREEN = "\u001B[32m";
-    public static final String ANSI_YELLOW = "\u001B[33m";
-    public static final String ANSI_BLUE = "\u001B[34m";
-    public static final String ANSI_RESET = "\u001B[0m";
-
+public class StorageCleaner implements GarbageCleaner {
     private final boolean cleanup;
     private final boolean timeMachineEnabled;
 
@@ -165,7 +164,8 @@ public class StorageCleaner {
 
     private Set<StorageItem> allFileSystems = Sets.newHashSet();
 
-    public void execute() throws Exception {
+    @Override
+    public void execute() throws InterruptedException {
         long start = System.currentTimeMillis();
         val config = KylinConfig.getInstanceFromEnv();
         long startTime = System.currentTimeMillis();
@@ -195,8 +195,8 @@ public class StorageCleaner {
             log.debug("start to collect HDFS from {}", 
allFileSystem.getPath());
             try {
                 collectFromHDFS(allFileSystem);
-            } catch (FileNotFoundException e) {
-                log.warn("No garbage files collected from {}", 
allFileSystem.getPath());
+            } catch (IOException e) {
+                log.warn("No garbage files collected from {}", 
allFileSystem.getPath(), e);
             }
             log.debug("folder {} is collected,detailed -> {}", 
allFileSystem.getPath(), allFileSystems);
         }
@@ -221,8 +221,8 @@ public class StorageCleaner {
                 try {
                     log.debug("start to add item {}", path);
                     addItem(item.getFileSystemDecorator(), path, 
protectionTime);
-                } catch (FileNotFoundException e) {
-                    log.warn("{} not found", path);
+                } catch (IOException e) {
+                    log.warn("{} not found", path, e);
                 }
             }
         }
@@ -231,21 +231,25 @@ public class StorageCleaner {
     }
 
     public void printConsole(boolean success, long duration) {
-        System.out.println(ANSI_BLUE + "Kylin 5.0 garbage report: (cleanup=" + 
cleanup + ")" + ANSI_RESET);
+        System.out.println(StringConstant.ANSI_BLUE + "Kylin 5.0 garbage 
report: (cleanup=" + cleanup + ")"
+                + StringConstant.ANSI_RESET);
         for (StorageItem item : outdatedItems) {
             System.out.println("  Storage File: " + item.getPath());
         }
         String jobName = "Storage GC cleanup job ";
         if (!cleanup) {
-            System.out.println(ANSI_BLUE + "Dry run mode, no data is deleted." 
+ ANSI_RESET);
+            System.out.println(
+                    StringConstant.ANSI_BLUE + "Dry run mode, no data is 
deleted." + StringConstant.ANSI_RESET);
             jobName = "Storage GC check job ";
         }
         if (!success) {
-            System.out.println(ANSI_RED + jobName + "FAILED." + ANSI_RESET);
-            System.out.println(ANSI_RED + jobName + "finished in " + duration 
+ " ms." + ANSI_RESET);
+            System.out.println(StringConstant.ANSI_RED + jobName + "FAILED." + 
StringConstant.ANSI_RESET);
+            System.out.println(
+                    StringConstant.ANSI_RED + jobName + "finished in " + 
duration + " ms." + StringConstant.ANSI_RESET);
         } else {
-            System.out.println(ANSI_GREEN + jobName + "SUCCEED." + ANSI_RESET);
-            System.out.println(ANSI_GREEN + jobName + "finished in " + 
duration + " ms." + ANSI_RESET);
+            System.out.println(StringConstant.ANSI_GREEN + jobName + 
"SUCCEED." + StringConstant.ANSI_RESET);
+            System.out.println(StringConstant.ANSI_GREEN + jobName + "finished 
in " + duration + " ms."
+                    + StringConstant.ANSI_RESET);
         }
 
     }
@@ -267,7 +271,7 @@ public class StorageCleaner {
         new ProjectTemporaryTableCleaner(project).execute();
     }
 
-    public boolean cleanup() throws Exception {
+    public boolean cleanup() throws InterruptedException {
         boolean success = true;
         if (cleanup) {
             Stats stats = new Stats() {
@@ -535,7 +539,7 @@ public class StorageCleaner {
         return getDataLayoutDir(dataLayout) + "/" + 
dataPartition.getBucketId();
     }
 
-    private void collectFromHDFS(StorageItem item) throws Exception {
+    private void collectFromHDFS(StorageItem item) throws IOException {
         val projectFolders = item.getFileSystemDecorator().listStatus(new 
Path(item.getPath()),
                 path -> !path.getName().startsWith("_")
                         && (this.projectNames.isEmpty() || 
this.projectNames.contains(path.getName())));
@@ -575,7 +579,8 @@ public class StorageCleaner {
                             .forEach(x -> slot.add(new 
FileTreeNode(x.getPath().getName(), node)));
                 }
             }
-            projectNode.getBuckets().addAll(collectMultiPartitions(item, 
projectNode.getName(), projectNode.getLayouts()));
+            projectNode.getBuckets()
+                    .addAll(collectMultiPartitions(item, 
projectNode.getName(), projectNode.getLayouts()));
         }
 
     }
@@ -841,4 +846,135 @@ public class StorageCleaner {
             return !errorItems.isEmpty();
         }
     }
+
+    /**
+     * Sparder history dir hierarchy is
+     *
+     * /${kylin.storage.columnar.spark-conf.spark.eventLog.dir}
+     * |--/${hostName_port}
+     *    |--/{eventlog_v2_${appid}#${timestamp}}
+     *    |  +--/${events_${fileindex}_${appid}_${starttime}_${endtime}}
+     */
+
+    @Slf4j
+    public static class EventLogCleaner {
+        private static final KylinConfig KYLIN_CONFIG = 
KylinConfig.getInstanceFromEnv();
+        private static final FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        private static final String FIRST_EVENT_LOG_FILE_PREFIX = "events_1_";
+        private static final String SPARK_EVENTLOG_DIR = "spark.eventLog.dir";
+        private long queryExpirationTime;
+        private long buildExpirationTime;
+        private final boolean cleanup;
+
+        private void init() {
+            long eventLogCleanStartTime = System.currentTimeMillis();
+            Long minQueryHistoryTime = 
QueryHisStoreUtil.getQueryHistoryMinQueryTime();
+            long cleanTime = eventLogCleanStartTime - 
KYLIN_CONFIG.getQueryHistorySurvivalThreshold();
+            queryExpirationTime = Objects.isNull(minQueryHistoryTime) ? 
cleanTime
+                    : Math.min(cleanTime, minQueryHistoryTime);
+            log.info(
+                    "eventLogCleanStartTime is {}, 
queryHistorySurvivalThreshold is {}ms, minQueryHistoryTime is {}, 
queryExpirationTime is {}",
+                    eventLogCleanStartTime, 
KYLIN_CONFIG.getQueryHistorySurvivalThreshold(), minQueryHistoryTime,
+                    queryExpirationTime);
+
+            long earliest = Long.MAX_VALUE;
+            NProjectManager projectManager = 
NProjectManager.getInstance(KYLIN_CONFIG);
+            for (ProjectInstance prj : projectManager.listAllProjects()) {
+                NExecutableManager executableManager = 
NExecutableManager.getInstance(KYLIN_CONFIG, prj.getName());
+                for (AbstractExecutable executable : 
executableManager.getAllExecutables()) {
+                    if (executable.getCreateTime() < earliest) {
+                        earliest = executable.getCreateTime();
+                    }
+                }
+            }
+
+            buildExpirationTime = Math.min(eventLogCleanStartTime - 
KYLIN_CONFIG.getExecutableSurvivalTimeThreshold(),
+                    earliest);
+            log.info(
+                    "eventLogCleanStartTime is {}, 
executableSurvivalTimeThreshold is {}ms, earliest executable's createTime is 
{}, buildExpirationTime is {}",
+                    eventLogCleanStartTime, 
KYLIN_CONFIG.getExecutableSurvivalTimeThreshold(), earliest,
+                    buildExpirationTime);
+        }
+
+        public EventLogCleaner(boolean cleanup) {
+            this.cleanup = cleanup;
+            init();
+        }
+
+        public void execute() {
+            if (cleanup) {
+                cleanCurrentSparderEventLog();
+                cleanSparkEventLogs();
+            }
+        }
+
+        public void cleanCurrentSparderEventLog() {
+            log.info("Start to clean sparder event log");
+            String currentSparderEvenLogDir = 
ExtractFactory.create().getSparderEvenLogDir();
+            clean(currentSparderEvenLogDir, queryExpirationTime);
+            log.info("End to clean sparder event log");
+        }
+
+        private void cleanSparkEventLogs() {
+            log.info("Start to clean spark event log");
+            String allSparkEventLogDir = 
KYLIN_CONFIG.getSparkConfigOverride().get(SPARK_EVENTLOG_DIR).trim();
+            clean(allSparkEventLogDir, buildExpirationTime);
+
+            EpochManager epochManager = EpochManager.getInstance();
+            NProjectManager prjManager = 
NProjectManager.getInstance(KYLIN_CONFIG);
+            prjManager.listAllProjects().forEach(project -> {
+                if (!epochManager.checkEpochOwner(project.getName())) {
+                    return;
+                }
+                String sparkEventLogDir = 
project.getConfig().getExtendedOverrides()
+                        .get("kylin.engine.spark-conf.spark.eventLog.dir");
+                if (!StringUtils.isEmpty(sparkEventLogDir)) {
+                    clean(sparkEventLogDir, buildExpirationTime);
+                }
+            });
+            log.info("End to clean spark event log");
+        }
+
+        // for RoutineTool & FastRoutineTool
+        public void cleanAllEventLog() {
+            log.info("Start to clean all event log");
+            String rootSparderEvenLogDir = 
KapConfig.wrap(KYLIN_CONFIG).getSparkConf().get(SPARK_EVENTLOG_DIR);
+            try {
+                Arrays.stream(fs.listStatus(new Path(rootSparderEvenLogDir)))
+                        .forEach(fileStatus -> 
clean(fileStatus.getPath().toString(), queryExpirationTime));
+            } catch (IOException e) {
+                log.warn("Failed to clean all sparder event log of [{}]", 
rootSparderEvenLogDir, e);
+            }
+
+            cleanSparkEventLogs();
+            log.info("End to clean all event log");
+        }
+
+        private void clean(String dir, long expirationTime) {
+            Path path = new Path(dir);
+            try {
+                FileStatus[] fileStatuses = fs.listStatus(path);
+                for (FileStatus fileStatus : fileStatuses) {
+                    deleteEventLogFile(fileStatus, expirationTime);
+                }
+            } catch (Exception e) {
+                log.warn("Failed to clean the event log of [{}]", 
path.getName(), e);
+            }
+        }
+
+        private void deleteEventLogFile(FileStatus fileStatus, long 
expirationTime) throws IOException {
+            if (fileStatus.getModificationTime() <= expirationTime
+                    && 
!fileStatus.getPath().getName().startsWith(FIRST_EVENT_LOG_FILE_PREFIX)) {
+                log.info("Delete event log file: {}", 
fileStatus.getPath().toString());
+                fs.delete(fileStatus.getPath(), true);
+                return;
+            }
+
+            if (fileStatus.isFile()) {
+                return;
+            }
+
+            clean(fileStatus.getPath().toString(), expirationTime);
+        }
+    }
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java
index a9ef41094c..b9950c2d35 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java
@@ -53,6 +53,7 @@ public class FastRoutineTool extends RoutineTool {
             }
             System.out.println("Start to fast cleanup hdfs");
             cleanStorage();
+            cleanEventLogs();
         } catch (Exception e) {
             log.error("Failed to execute fast routintool", e);
         }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java
index 0de65f1780..01c05f76c6 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ExecutableApplication;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -33,8 +34,8 @@ import org.apache.kylin.helper.RoutineToolHelper;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.tool.MaintainModeTool;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
-import org.apache.kylin.tool.garbage.StorageCleaner;
 import org.apache.kylin.tool.util.ToolMainWrapper;
 import org.apache.kylin.metadata.epoch.EpochManager;
 
@@ -132,6 +133,7 @@ public class RoutineTool extends ExecutableApplication {
                 RoutineToolHelper.cleanMeta(projectsToCleanup);
             }
             cleanStorage();
+            cleanEventLogs();
         } catch (Exception e) {
             log.error("Failed to execute routintool", e);
             throw e;
@@ -141,13 +143,30 @@ public class RoutineTool extends ExecutableApplication {
     public void cleanStorage() {
         try {
             System.out.println("Start to cleanup HDFS");
-            CleanTaskExecutorService.getInstance().cleanStorageForRoutine(
-                storageCleanup, Arrays.asList(projects), requestFSRate, 
retryTimes);
+            
CleanTaskExecutorService.getInstance().cleanStorageForRoutine(storageCleanup, 
Arrays.asList(projects),
+                    requestFSRate, retryTimes);
             System.out.println("cleanup HDFS finished");
         } catch (Exception e) {
-            System.out.println(StorageCleaner.ANSI_RED
-                + "cleanup HDFS failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
-                + StorageCleaner.ANSI_RESET);
+            System.out.println(StringConstant.ANSI_RED
+                    + "cleanup HDFS failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
+                    + StringConstant.ANSI_RESET);
+        }
+    }
+
+    protected void cleanEventLogs() {
+        try {
+            if (!ArrayUtils.isEmpty(projects)) {
+                System.out.println("The event log will not be cleaned when the 
projects is specified");
+                return;
+            }
+
+            System.out.println("Start to clean all event logs");
+            RoutineToolHelper.cleanEventLog(storageCleanup, true, true);
+            System.out.println("Clean all event logs finished");
+        } catch (Exception e) {
+            System.out.println(StringConstant.ANSI_RED
+                    + "Clean all event logs failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
+                    + StringConstant.ANSI_RESET);
         }
     }
 
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java 
b/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java
index d1eec35e5b..eb5edfb5e7 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/security/AdminUserInitCLI.java
@@ -28,20 +28,20 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.RandomUtil;
-import org.apache.kylin.rest.constant.Constant;
-import org.apache.kylin.util.PasswordEncodeFactory;
 import org.apache.kylin.common.persistence.metadata.PersistException;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import org.apache.kylin.metadata.user.ManagedUser;
 import org.apache.kylin.metadata.user.NKylinUserManager;
-import org.apache.kylin.tool.garbage.StorageCleaner;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.tool.constant.StringConstant;
+import org.apache.kylin.util.PasswordEncodeFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import lombok.val;
 
 public class AdminUserInitCLI {
@@ -100,13 +100,13 @@ public class AdminUserInitCLI {
                     ByteSource.wrap(JsonUtil.writeValueAsBytes(managedUser)), 
System.currentTimeMillis(), 0L);
             metaStore.putResource(rawResource, null, 
UnitOfWork.DEFAULT_EPOCH_ID);
 
-            String blackColorUsernameForPrint = StorageCleaner.ANSI_RESET + 
ADMIN_USER_NAME + StorageCleaner.ANSI_RED;
-            String blackColorPasswordForPrint = StorageCleaner.ANSI_RESET + 
password + StorageCleaner.ANSI_RED;
+            String blackColorUsernameForPrint = StringConstant.ANSI_RESET + 
ADMIN_USER_NAME + StringConstant.ANSI_RED;
+            String blackColorPasswordForPrint = StringConstant.ANSI_RESET + 
password + StringConstant.ANSI_RED;
             String info = String.format(Locale.ROOT,
                     "Create default user finished. The username of initialized 
user is [%s], which password is [%s].\n"
                             + "Please keep the password properly. And if you 
forget the password, you can reset it according to user manual.",
                     blackColorUsernameForPrint, blackColorPasswordForPrint);
-            System.out.println(StorageCleaner.ANSI_RED + info + 
StorageCleaner.ANSI_RESET);
+            System.out.println(StringConstant.ANSI_RED + info + 
StringConstant.ANSI_RESET);
         } catch (PersistException e) {
             logger.warn("{} user has been created on another node.", 
ADMIN_USER_NAME);
         }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java
 
b/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java
index 0ebcd2d8c2..7c186cfdde 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/security/KylinPasswordResetCLI.java
@@ -22,20 +22,20 @@ import java.util.Locale;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.util.PasswordEncodeFactory;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.user.NKylinUserManager;
 import org.apache.kylin.tool.MaintainModeTool;
 import org.apache.kylin.tool.MetadataTool;
-import org.apache.kylin.tool.garbage.StorageCleaner;
+import org.apache.kylin.tool.constant.StringConstant;
+import org.apache.kylin.util.PasswordEncodeFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.crypto.password.PasswordEncoder;
 
-import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import lombok.val;
 
 public class KylinPasswordResetCLI {
@@ -105,16 +105,16 @@ public class KylinPasswordResetCLI {
         }
 
         if (randomPasswordEnabled) {
-            String blackColorUsernameForPrint = StorageCleaner.ANSI_RESET + 
AdminUserInitCLI.ADMIN_USER_NAME
-                    + StorageCleaner.ANSI_RED;
-            String blackColorPasswordForPrint = StorageCleaner.ANSI_RESET + 
password + StorageCleaner.ANSI_RED;
+            String blackColorUsernameForPrint = StringConstant.ANSI_RESET + 
AdminUserInitCLI.ADMIN_USER_NAME
+                    + StringConstant.ANSI_RED;
+            String blackColorPasswordForPrint = StringConstant.ANSI_RESET + 
password + StringConstant.ANSI_RED;
             String info = String.format(Locale.ROOT,
                     "Reset password of [%s] succeed. The password is [%s].\n" 
+ "Please keep the password properly.",
                     blackColorUsernameForPrint, blackColorPasswordForPrint);
-            System.out.println(StorageCleaner.ANSI_RED + info + 
StorageCleaner.ANSI_RESET);
+            System.out.println(StringConstant.ANSI_RED + info + 
StringConstant.ANSI_RESET);
         } else {
             System.out.println(
-                    StorageCleaner.ANSI_YELLOW + "Reset the ADMIN password 
successfully." + StorageCleaner.ANSI_RESET);
+                    StringConstant.ANSI_YELLOW + "Reset the ADMIN password 
successfully." + StringConstant.ANSI_RESET);
         }
 
         return true;
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
index 48a4403e00..af10c230ad 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
@@ -17,8 +17,13 @@
  */
 package org.apache.kylin.tool;
 
+import static 
org.apache.kylin.common.KylinConfigBase.WRITING_CLUSTER_WORKING_DIR;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -33,33 +38,35 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.apache.kylin.tool.garbage.StorageCleaner;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
-
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import lombok.val;
+import lombok.var;
 import lombok.extern.slf4j.Slf4j;
 
-import static 
org.apache.kylin.common.KylinConfigBase.WRITING_CLUSTER_WORKING_DIR;
-
 @Slf4j
 public class StorageCleanerTest extends NLocalFileMetadataTestCase {
 
@@ -93,6 +100,51 @@ public class StorageCleanerTest extends 
NLocalFileMetadataTestCase {
         }
     }
 
+    @Test
+    public void testEventLogClean() throws IOException {
+        prepareForEventLogClean();
+        String allSparderEventLogDir = 
(KapConfig.wrap(getTestConfig()).getSparkConf().get("spark.eventLog.dir"))
+                .replace("file:", "");
+        String currentSparderEventLogDir = 
(KapConfig.wrap(getTestConfig()).getSparkConf().get("spark.eventLog.dir")
+                + "/" + AddressUtil.getLocalServerInfo() + 
"/eventlog_v2_application_1677899901295_4823#1690192675042")
+                        .replace("file:", "");
+        String sparkEventLogDir = 
getTestConfig().getSparkConfigOverride().get("spark.eventLog.dir").replace("file:",
+                "");
+
+        try (MockedStatic<QueryHisStoreUtil> qhsuMocked = 
Mockito.mockStatic(QueryHisStoreUtil.class)) {
+            
qhsuMocked.when(QueryHisStoreUtil::getQueryHistoryMinQueryTime).thenReturn(null);
+
+            int fileSize = new 
File(currentSparderEventLogDir).listFiles().length;
+            Assert.assertEquals(3, fileSize);
+            var cleaner = new StorageCleaner.EventLogCleaner(false);
+            cleaner.cleanCurrentSparderEventLog();
+            fileSize = new File(currentSparderEventLogDir).listFiles().length;
+            Assert.assertEquals(2, fileSize);
+            Assert.assertTrue(
+                    new File(currentSparderEventLogDir + 
"/events_1_application_1677899901295_8490_1690953331329")
+                            .exists());
+
+            int sparkEventLogFileSize = new 
File(sparkEventLogDir).listFiles().length;
+            Assert.assertEquals(5, sparkEventLogFileSize);
+            cleaner.execute();
+            Assert.assertEquals(5, sparkEventLogFileSize);
+
+            cleaner = new StorageCleaner.EventLogCleaner(true);
+            cleaner.execute();
+            sparkEventLogFileSize = new 
File(sparkEventLogDir).listFiles().length;
+            Assert.assertEquals(4, sparkEventLogFileSize);
+            Assert.assertFalse(new File(sparkEventLogDir + 
"/application_1677899901295_8243").exists());
+
+            File otherSparderEventLogFile = new File(
+                    allSparderEventLogDir + 
"/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380");
+            Assert.assertTrue(otherSparderEventLogFile.exists());
+            cleaner.cleanAllEventLog();
+            otherSparderEventLogFile = new File(
+                    allSparderEventLogDir + 
"/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380");
+            Assert.assertFalse(otherSparderEventLogFile.exists());
+        }
+    }
+
     @Test
     public void testCleanupAfterTruncate() throws Exception {
         val cleaner = new StorageCleaner();
@@ -339,6 +391,68 @@ public class StorageCleanerTest extends 
NLocalFileMetadataTestCase {
         execMgr.addJob(job2);
     }
 
+    private void prepareForEventLogClean() throws IOException {
+        KylinConfig config = getTestConfig();
+        String currentSparderEventLogDir = 
(KapConfig.wrap(config).getSparkConf().get("spark.eventLog.dir") + "/"
+                + AddressUtil.getLocalServerInfo()).replace("file:", "");
+        Files.createDirectories(Paths.get(currentSparderEventLogDir));
+
+        String allSparderEventLogDir = 
(KapConfig.wrap(config).getSparkConf().get("spark.eventLog.dir"))
+                .replace("file:", "");
+        String sparkEventLogDir = 
(config.getSparkConfigOverride().get("spark.eventLog.dir")).replace("file:", 
"");
+
+        FileUtils.copyDirectory(new 
File("src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070"),
+                new File(currentSparderEventLogDir));
+        FileUtils.copyDirectoryToDirectory(
+                new 
File("src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7071"),
+                new File(allSparderEventLogDir));
+        FileUtils.copyFileToDirectory(
+                new 
File("src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_0989"),
+                new File(sparkEventLogDir));
+        FileUtils.copyFileToDirectory(
+                new 
File("src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_8243"),
+                new File(sparkEventLogDir));
+
+        long currentTime = System.currentTimeMillis();
+        long twoMonth = 2 * 30 * 24 * 60 * 60 * 1000L;
+        long oneDay = 24 * 60 * 60 * 1000L;
+        long expired = currentTime - twoMonth;
+        long notExpired = currentTime - oneDay;
+
+        // Not expired
+        updateLastModified(currentSparderEventLogDir + 
"/eventlog_v2_application_1677899901295_4823#1690192675042",
+                notExpired);
+
+        // Expired
+        updateLastModified(currentSparderEventLogDir + 
"/eventlog_v2_application_1677899901295_4823#1690192675042/"
+                + "appstatus_application_1677899901295_8490", expired);
+
+        // Expired
+        updateLastModified(currentSparderEventLogDir + 
"/eventlog_v2_application_1677899901295_4823#1690192675042/"
+                + "events_1_application_1677899901295_8490_1690953331329", 
expired);
+
+        // Not expired
+        updateLastModified(currentSparderEventLogDir + 
"/eventlog_v2_application_1677899901295_4823#1690192675042/"
+                + "events_2_application_1677899901295_8490_1690953331329", 
notExpired);
+
+        // Expired
+        updateLastModified(
+                allSparderEventLogDir + 
"/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380",
+                expired);
+
+        // Expired: Less than the creation time of the earliest executable 
+        updateLastModified(sparkEventLogDir + 
"/application_1677899901295_8243", 974600451000L);
+
+        // Not expired 
+        updateLastModified(sparkEventLogDir + 
"/application_1677899901295_0989", notExpired);
+    }
+
+    public void updateLastModified(String file, long timeStamp) throws 
IOException {
+        java.nio.file.Path path = Paths.get(file);
+        FileTime newLastModifiedTime = FileTime.fromMillis(timeStamp);
+        Files.setLastModifiedTime(path, newLastModifiedTime);
+    }
+
     private Set<String> normalizeGarbages(Set<StorageCleaner.StorageItem> 
items) {
         return items.stream().map(i -> i.getPath().replaceAll("file:", 
"").replaceAll("/keep", ""))
                 .collect(Collectors.toSet());
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java
index 1b0e368eca..f65a9d2715 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java
@@ -72,7 +72,7 @@ class CleanTaskExecutorServiceTests extends 
NLocalFileMetadataTestCase {
         }
 
         @Override
-        public void execute() throws Exception {
+        public void execute() throws InterruptedException {
             if (runnable != null) {
                 runnable.run();
             }
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
index 0181f4026b..9d9474eadd 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.kylin.tool.garbage;
 
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.dao.NExecutableDao;
 import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,7 +54,7 @@ public class ExecutableCleanerTest extends 
NLocalFileMetadataTestCase {
         createUnexpiredJob(jobId);
         Assert.assertEquals(1, manager.getJobs().size());
         manager.discardJob(jobId);
-        new ExecutableCleaner(DEFAULT_PROJECT).cleanup();
+        new ExecutableCleaner(DEFAULT_PROJECT).execute();
         Assert.assertEquals(1, manager.getJobs().size());
     }
 
@@ -62,7 +62,7 @@ public class ExecutableCleanerTest extends 
NLocalFileMetadataTestCase {
     public void testCleanupWithRunningJob() {
         createExpiredJob(RandomUtil.randomUUIDStr());
         Assert.assertEquals(1, manager.getJobs().size());
-        new ExecutableCleaner(DEFAULT_PROJECT).cleanup();
+        new ExecutableCleaner(DEFAULT_PROJECT).execute();
         Assert.assertEquals(1, manager.getJobs().size());
     }
 
@@ -72,7 +72,7 @@ public class ExecutableCleanerTest extends 
NLocalFileMetadataTestCase {
         createExpiredJob(jobId);
         manager.discardJob(jobId);
         Assert.assertEquals(1, manager.getJobs().size());
-        new ExecutableCleaner(DEFAULT_PROJECT).cleanup();
+        new ExecutableCleaner(DEFAULT_PROJECT).execute();
         Assert.assertEquals(0, manager.getJobs().size());
     }
 
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java
index 90fd332091..0031051487 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SnapshotCleanerTest.java
@@ -79,7 +79,7 @@ public class SnapshotCleanerTest extends 
NLocalFileMetadataTestCase {
         SnapshotCleaner snapshotCleaner = new SnapshotCleaner(DEFAULT_PROJECT);
         snapshotCleaner.prepare();
         UnitOfWork.doInTransactionWithRetry(() -> {
-            snapshotCleaner.cleanup();
+            snapshotCleaner.execute();
             return 0;
         }, DEFAULT_PROJECT);
 
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java
index 983bc26d58..3f0dbe5d82 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/SourceUsageCleanerTest.java
@@ -58,7 +58,7 @@ public class SourceUsageCleanerTest extends 
NLocalFileMetadataTestCase {
         manager.updateSourceUsage(record);
         List<SourceUsageRecord> allRecords = manager.getAllRecords();
         Assert.assertEquals(1, allRecords.size());
-        sourceUsageCleaner.cleanup();
+        sourceUsageCleaner.execute();
         allRecords = manager.getAllRecords();
         Assert.assertEquals(1, allRecords.size());
     }
@@ -74,7 +74,7 @@ public class SourceUsageCleanerTest extends 
NLocalFileMetadataTestCase {
         manager.updateSourceUsage(record1);
         List<SourceUsageRecord> allRecords = manager.getAllRecords();
         Assert.assertEquals(2, allRecords.size());
-        sourceUsageCleaner.cleanup();
+        sourceUsageCleaner.execute();
         allRecords = manager.getAllRecords();
         Assert.assertEquals(1, allRecords.size());
         Assert.assertEquals(1, allRecords.get(0).getCreateTime());
@@ -87,7 +87,7 @@ public class SourceUsageCleanerTest extends 
NLocalFileMetadataTestCase {
         manager.updateSourceUsage(record);
         List<SourceUsageRecord> allRecords = manager.getAllRecords();
         Assert.assertEquals(1, allRecords.size());
-        sourceUsageCleaner.cleanup();
+        sourceUsageCleaner.execute();
         allRecords = manager.getAllRecords();
         Assert.assertEquals(1, allRecords.size());
     }
@@ -95,7 +95,7 @@ public class SourceUsageCleanerTest extends 
NLocalFileMetadataTestCase {
     @Test
     public void testCleanupZeroSourceUsage() {
         List<SourceUsageRecord> allRecords = manager.getAllRecords();
-        sourceUsageCleaner.cleanup();
+        sourceUsageCleaner.execute();
         Assert.assertEquals(0, allRecords.size());
     }
 
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java
index 4551646ccb..62b43c2ce6 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/security/AdminUserInitCLITest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.user.NKylinUserManager;
-import org.apache.kylin.tool.garbage.StorageCleaner;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -72,13 +72,13 @@ public class AdminUserInitCLITest extends 
NLocalFileMetadataTestCase {
 
         // assert output on console
         Assert.assertTrue(output.toString(Charset.defaultCharset().name())
-                .startsWith(StorageCleaner.ANSI_RED
+                .startsWith(StringConstant.ANSI_RED
                         + "Create default user finished. The username of 
initialized user is ["
-                        + StorageCleaner.ANSI_RESET + "ADMIN" + 
StorageCleaner.ANSI_RED + "], which password is "));
+                        + StringConstant.ANSI_RESET + "ADMIN" + 
StringConstant.ANSI_RED + "], which password is "));
         Assert.assertTrue(output.toString(Charset.defaultCharset().name())
                 .endsWith("Please keep the password properly. "
                         + "And if you forget the password, you can reset it 
according to user manual."
-                        + StorageCleaner.ANSI_RESET + "\n"));
+                        + StringConstant.ANSI_RESET + "\n"));
 
         System.setOut(System.out);
 
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
index e3c17825cc..6bf3445289 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
@@ -29,10 +29,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.metadata.jdbc.AuditLogRowMapper;
 import org.apache.kylin.common.util.LogOutputTestCase;
-import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.metadata.user.ManagedUser;
 import org.apache.kylin.metadata.user.NKylinUserManager;
-import org.apache.kylin.tool.garbage.StorageCleaner;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.tool.constant.StringConstant;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -95,10 +95,10 @@ public class KylinPasswordResetCLITest extends 
LogOutputTestCase {
         Assert.assertFalse(pwdEncoder.matches("KYLIN", 
afterManager.get(user.getUsername()).getPassword()));
         
Assert.assertTrue(output.toString(Charset.defaultCharset().name()).startsWith("The
 metadata backup path is"));
         Assert.assertTrue(output.toString(Charset.defaultCharset().name())
-            .contains(StorageCleaner.ANSI_RED + "Reset password of [" + 
StorageCleaner.ANSI_RESET + "ADMIN"
-                + StorageCleaner.ANSI_RED + "] succeed. The password is "));
+                .contains(StringConstant.ANSI_RED + "Reset password of [" + 
StringConstant.ANSI_RESET + "ADMIN"
+                        + StringConstant.ANSI_RED + "] succeed. The password 
is "));
         Assert.assertTrue(output.toString(Charset.defaultCharset().name())
-                .endsWith("Please keep the password properly." + 
StorageCleaner.ANSI_RESET + "\n"));
+                .endsWith("Please keep the password properly." + 
StringConstant.ANSI_RESET + "\n"));
 
         val url = getTestConfig().getMetadataUrl();
         val jdbcTemplate = getJdbcTemplate();
diff --git 
a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/appstatus_application_1677899901295_8490
 
b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/appstatus_application_1677899901295_8490
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_1_application_1677899901295_8490_1690953331329
 
b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_1_application_1677899901295_8490_1690953331329
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_2_application_1677899901295_8490_1690953331329
 
b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7070/eventlog_v2_application_1677899901295_4823#1690192675042/events_2_application_1677899901295_8490_1690953331329
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380/events_1_application_1677899901295_5936_1690235831577
 
b/src/tool/src/test/resources/ut_storage/working-dir2/sparder-history/localhost_7071/eventlog_v2_application_1677899901295_4824#1690192771380/events_1_application_1677899901295_5936_1690235831577
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_0989
 
b/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_0989
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_8243
 
b/src/tool/src/test/resources/ut_storage/working-dir2/spark-history/application_1677899901295_8243
new file mode 100644
index 0000000000..e69de29bb2

Reply via email to