This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 858d0ae6da67e5ba1d418df4043af91f28611190
Author: Guoliang Sun <guoliang....@kyligence.io>
AuthorDate: Tue Jan 3 13:39:27 2023 +0800

    KYLIN-5459 Partial Log Governance
---
 build/conf/kylin-server-log4j.xml                  |  48 +++++++--
 .../kylin/rest/aspect/SchedulerEnhancer.java       |   2 +-
 .../apache/kylin/rest/broadcaster/Broadcaster.java |   8 +-
 .../apache/kylin/rest/service/AuditLogService.java |  10 +-
 .../service/task/QueryHistoryTaskScheduler.java    |   8 +-
 .../apache/kylin/common/constant/LogConstant.java} |  18 ++--
 .../transaction/AbstractAuditLogReplayWorker.java  |  10 +-
 .../transaction/AuditLogReplayWorker.java          |  18 +++-
 .../common/persistence/transaction/UnitOfWork.java |  52 ++++++----
 .../kylin/common/scheduler/EventBusFactory.java    |  16 ++-
 .../kylin/job/execution/DefaultExecutable.java     |   2 +
 .../cube/storage/TotalStorageCollector.java        |   2 +-
 .../apache/kylin/metadata/epoch/EpochManager.java  | 108 ++++++++++++++-------
 .../metadata/recommendation/ref/OptRecV2.java      |   4 +-
 .../kylin/metadata/epoch/EpochManagerTest.java     |  22 +++++
 .../common/metrics/MetricsInfluxdbReporter.java    |   4 +-
 .../rest/scheduler/AutoRefreshSnapshotRunner.java  |   8 +-
 .../org/apache/kylin/rest/service/JobService.java  |  30 ++++--
 .../apache/kylin/rest/service/JobServiceTest.java  |  12 ++-
 .../kylin/rest/broadcaster/BroadcasterTest.java    |  35 +++++++
 .../kylin/rest/service/QueryCacheManager.java      |   8 +-
 .../apache/kylin/rest/service/QueryService.java    |   8 +-
 .../spark/source/NSparkMetadataExplorer.java       |   2 +-
 .../engine/spark/builder/SegmentFlatTable.scala    |  22 +++--
 .../engine/spark/job/RDSegmentBuildExec.scala      |   5 +-
 .../kylin/engine/spark/job/SegmentBuildJob.java    |   2 +
 .../kylin/engine/spark/job/exec/BuildExec.scala    |   8 +-
 .../kylin/engine/spark/job/exec/MergeExec.scala    |   2 +
 .../kylin/engine/spark/job/exec/SnapshotExec.scala |   2 +
 .../engine/spark/job/exec/TableAnalyzerExec.scala  |   2 +
 .../kylin/engine/spark/job/stage/StageExec.scala   |   2 +
 .../engine/spark/job/stage/WaiteForResource.scala  |   2 +
 .../engine/spark/job/stage/build/BuildDict.scala   |   2 +
 .../engine/spark/job/stage/build/BuildLayer.scala  |   2 +
 .../spark/job/stage/build/CostBasedPlanner.scala   |   2 +
 .../job/stage/build/FlatTableAndDictBase.scala     |   5 +
 .../job/stage/build/GatherFlatTableStats.scala     |   2 +
 .../spark/job/stage/build/GenerateFlatTable.scala  |   2 +
 .../stage/build/MaterializedFactTableView.scala    |   2 +
 .../spark/job/stage/build/RefreshColumnBytes.scala |   2 +
 .../spark/job/stage/build/RefreshSnapshots.scala   |   2 +
 .../stage/build/partition/PartitionBuildDict.scala |   2 +
 .../build/partition/PartitionBuildLayer.scala      |   2 +
 .../partition/PartitionCostBasedPlanner.scala      |   2 +
 .../partition/PartitionGatherFlatTableStats.scala  |   1 +
 .../partition/PartitionGenerateFlatTable.scala     |   2 +
 .../PartitionMaterializedFactTableView.scala       |   2 +
 .../partition/PartitionRefreshColumnBytes.scala    |   2 +
 .../spark/job/stage/merge/MergeColumnBytes.scala   |   2 +
 .../spark/job/stage/merge/MergeFlatTable.scala     |   2 +
 .../spark/job/stage/merge/MergeIndices.scala       |   2 +
 .../partition/PartitionMergeColumnBytes.scala      |   2 +
 .../merge/partition/PartitionMergeFlatTable.scala  |   2 +
 .../merge/partition/PartitionMergeIndices.scala    |   2 +
 .../spark/job/stage/snapshots/SnapshotsBuild.scala |   2 +
 .../job/stage/tablesampling/AnalyzerTable.scala    |   2 +
 .../spark/job/stage/WaiteForResourceTest.scala}    |  20 ++--
 .../job/stage/build/RefreshColumnBytesTest.scala}  |  37 +++----
 .../job/stage/build/RefreshSnapshotsTest.scala}    |  18 ++--
 .../PartitionRefreshColumnBytesTest.scala}         |  30 +++---
 .../job/stage/merge/MergeColumnBytesTest.scala}    |  18 +++-
 .../spark/job/stage/merge/MergeStageTest.scala     |   2 +
 .../partition/PartitionMergeColumnBytesTest.scala} |  18 +++-
 .../org/apache/spark/utils/TestResourceUtils.scala |   6 ++
 .../common/asyncprofiler/AsyncProfilerUtils.java   |   2 +-
 .../org/apache/kylin/common/CustomUtils.scala}     |  19 ++--
 .../java/org/apache/kylin/tool/KylinLogTool.java   |   2 +-
 67 files changed, 499 insertions(+), 205 deletions(-)

diff --git a/build/conf/kylin-server-log4j.xml 
b/build/conf/kylin-server-log4j.xml
index 701e8071fc..93e0b51299 100644
--- a/build/conf/kylin-server-log4j.xml
+++ b/build/conf/kylin-server-log4j.xml
@@ -26,7 +26,7 @@
             <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] 
%c{2} : %mask{%m}%n"/>
         </RollingRandomAccessFile>
         <Routing name="routing">
-            <Routes pattern="${ctx:logCategory}">
+            <Routes pattern="$${ctx:logCategory}">
                 <Route>
                     <RollingFile name="rolling-${ctx:logCategory}"
                                  
fileName="${env:KYLIN_HOME}/logs/kylin.${ctx:logCategory}.log"
@@ -39,7 +39,7 @@
                     </RollingFile>
                 </Route>
 
-                <Route ref="server" key="${ctx:logCategory}"/>
+                <Route ref="server" key="$${ctx:logCategory}"/>
             </Routes>
         </Routing>
         <RollingFile name="query-log-spark" 
fileName="${env:KYLIN_HOME}/logs/kylin.query.log" append="true"
@@ -50,20 +50,48 @@
             <DefaultRolloverStrategy max="10"/>
             <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] 
%c{2} : %mask{%m}%n"/>
         </RollingFile>
+        <RollingFile name="spark-history-server" 
fileName="${env:KYLIN_HOME}/logs/kylin.history_server.log"
+                     append="true"
+                     
filePattern="${env:KYLIN_HOME}/logs/kylin.history_server.log.%i">
+            <Policies>
+                <SizeBasedTriggeringPolicy size="268435456"/>
+            </Policies>
+            <DefaultRolloverStrategy max="10"/>
+            <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] 
%c{2} : %mask{%m}%n"/>
+        </RollingFile>
+        <RollingFile name="build-log-spark" 
fileName="${env:KYLIN_HOME}/logs/kylin.build.log" append="true"
+                     filePattern="${env:KYLIN_HOME}/logs/kylin.build.log.%i">
+            <Policies>
+                <SizeBasedTriggeringPolicy size="268435456"/>
+            </Policies>
+            <DefaultRolloverStrategy max="10"/>
+            <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] 
%c{2} : %mask{%m}%n"/>
+        </RollingFile>
+        <RollingFile name="metadata-log-spark" 
fileName="${env:KYLIN_HOME}/logs/kylin.metadata.log" append="true"
+                     
filePattern="${env:KYLIN_HOME}/logs/kylin.metadata.log.%i">
+            <Policies>
+                <SizeBasedTriggeringPolicy size="268435456"/>
+            </Policies>
+            <DefaultRolloverStrategy max="10"/>
+            <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] 
%c{2} : %mask{%m}%n"/>
+        </RollingFile>
     </Appenders>
     <Loggers>
         <Root level="INFO">
             <AppenderRef ref="routing"/>
         </Root>
-        <Logger name="org.apache.spark.scheduler.TaskSetManager" level="INFO" 
additivity="false">
+        <Logger name="org.apache.spark.scheduler.TaskSetManager" level="WARN" 
additivity="false">
             <AppenderRef ref="query-log-spark"/>
         </Logger>
-        <Logger name="org.apache.spark.scheduler.DAGScheduler" level="INFO" 
additivity="false">
+        <Logger name="org.apache.spark.scheduler.DAGScheduler" level="WARN" 
additivity="false">
             <AppenderRef ref="query-log-spark"/>
         </Logger>
-        <Logger name="org.apache.spark.scheduler.YarnScheduler" level="INFO" 
additivity="false">
+        <Logger name="org.apache.spark.scheduler.YarnScheduler" level="WARN" 
additivity="false">
             <AppenderRef ref="query-log-spark"/>
         </Logger>
+        <Logger name="org.apache.spark.deploy.history" level="INFO" 
additivity="false">
+            <AppenderRef ref="spark-history-server"/>
+        </Logger>
         <Logger name="io.kyligence" level="DEBUG"/>
         <Logger name="org.springframework" level="WARN"/>
         <Logger name="org.apache.kylin" level="DEBUG"/>
@@ -74,12 +102,20 @@
         <Logger name="org.apache.kylin.ext" level="INFO"/>
         <!--  Query log  -->
         <Logger name="org.apache.kylin.query" level="INFO"/>
-        <Logger name="org.apache.kylin.query" level="INFO"/>
         <Logger name="NDataflowCapabilityChecker" level="INFO" />
         <Logger name="org.apache.kylin.common.util.CheckUtil" level="INFO" />
         <Logger name="NQueryLayoutChooser" level="INFO" />
         <Logger name="org.apache.kylin.query.runtime.plan.ResultPlan" 
level="INFO" />
         <Logger name="org.apache.spark.sql.kylin.external.LogEx" level="INFO" 
/>
         <Logger name="org.apache.kylin.engine.spark.utils.LogEx" level="INFO" 
/>
+        <Logger name="org.apache.kylin.rest.service.QueryCacheManager" 
level="INFO"/>
+        <!-- Kerberos log -->
+        <Logger name="io.kyligence.kap.tool.kerberos" level="INFO"/>
+        <!-- Other log -->
+        <Logger 
name="org.apache.kylin.metadata.cube.storage.TotalStorageCollector" 
level="INFO" />
+        <Logger name="org.apache.kylin.common.metrics.MetricsInfluxdbReporter" 
level="INFO" />
+        <Logger name="io.kyligence.kap.metadata.recommendation.ref.OptRecV2" 
level="INFO" />
+        <Logger 
name="org.apache.kylin.rest.security.LdapAuthenticationProvider" level="INFO" />
+        <Logger name="org.apache.kylin.rest.aspect.SchedulerEnhancer" 
level="INFO" />
     </Loggers>
 </Configuration>
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java
index b086f379d2..d39066c42c 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java
@@ -35,7 +35,7 @@ public class SchedulerEnhancer {
     public void aroundScheduled(ProceedingJoinPoint pjp) throws Throwable {
         val config = KylinConfig.getInstanceFromEnv();
         if (!"query".equals(config.getServerMode())) {
-            log.info("schedule at job leader");
+            log.debug("schedule at job leader");
             pjp.proceed();
         }
     }
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java
index c7b36369c1..92b9686557 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java
@@ -39,6 +39,8 @@ import java.util.stream.Stream;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.kylin.common.constant.LogConstant;
+import org.apache.kylin.common.logging.SetLogCategory;
 import 
org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.DaemonThreadFactory;
@@ -91,7 +93,9 @@ public class Broadcaster implements Closeable {
 
     public void announce(BroadcastEventReadyNotifier event) {
         if (eventQueue.contains(event)) {
-            logger.debug("broadcast event queue has contain this event: {}", 
event);
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
+                logger.debug("broadcast event queue has contain this event: 
{}", event);
+            }
             return;
         }
         if (!eventQueue.offer(event)) {
@@ -100,7 +104,7 @@ public class Broadcaster implements Closeable {
     }
 
     public void consumeEvent() {
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
             while (isRunning) {
                 BroadcastEventReadyNotifier notifier = eventQueue.take();
                 handleEvent(notifier);
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java
index 1d79c2734f..1f60bfdd83 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java
@@ -19,6 +19,8 @@
 package org.apache.kylin.rest.service;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.LogConstant;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,8 +33,10 @@ public class AuditLogService {
 
     public void notifyCatchUp() {
         ResourceStore store = 
ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv());
-        logger.info("Start to catchup manually");
-        store.getAuditLogStore().catchup();
-        logger.info("End to catchup manually");
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            logger.info("Start to catchup manually");
+            store.getAuditLogStore().catchup();
+            logger.info("End to catchup manually");
+        }
     }
 }
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
index eae1ed351b..07ceafe06c 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
@@ -33,6 +33,8 @@ import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.LogConstant;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.util.ExecutorServiceUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
 import org.apache.kylin.common.util.Pair;
@@ -98,7 +100,9 @@ public class QueryHistoryTaskScheduler {
         if (querySmartSupporter == null && 
SpringContext.getApplicationContext() != null) {
             querySmartSupporter = 
SpringContext.getBean(QuerySmartSupporter.class);
         }
-        log.debug("New QueryHistoryAccelerateScheduler created by project {}", 
project);
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
+            log.debug("New QueryHistoryAccelerateScheduler created by project 
{}", project);
+        }
     }
 
     public static QueryHistoryTaskScheduler getInstance(String project) {
@@ -484,7 +488,7 @@ public class QueryHistoryTaskScheduler {
 
         @Override
         public void run() {
-            try {
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
                 work();
             } catch (Exception e) {
                 log.warn("QueryHistory {}  process failed of project({})", 
name(), project, e);
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
 
b/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java
similarity index 69%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
copy to 
src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java
index 3091e92e0a..4a5293dffa 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java
@@ -16,17 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.engine.spark.job.stage.merge
+package org.apache.kylin.common.constant;
 
-import org.apache.kylin.engine.spark.job.SegmentJob
-import org.apache.kylin.metadata.cube.model.NDataSegment
+public class LogConstant {
 
-class MergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment)
-  extends MergeStage(jobContext, dataSegment) {
+    private LogConstant() {
+    }
 
-  override def execute(): Unit = {
-    mergeColumnBytes()
-
-    cleanup()
-  }
+    public static final String SCHEDULE_CATEGORY = "schedule";
+    public static final String METADATA_CATEGORY = "metadata";
+    public static final String QUERY_CATEGORY = "query";
+    public static final String BUILD_CATEGORY = "build";
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
index afa66f885c..9bc177891a 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
@@ -30,7 +30,9 @@ import java.util.function.Predicate;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.LogConstant;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.persistence.AuditLog;
 import org.apache.kylin.common.persistence.UnitMessages;
 import org.apache.kylin.common.persistence.event.Event;
@@ -116,9 +118,11 @@ public abstract class AbstractAuditLogReplayWorker {
             }
         }
 
-        for (UnitMessages message : messagesMap.values()) {
-            log.debug("replay {} event for project:{}", 
message.getMessages().size(), message.getKey());
-            replayer.replay(message);
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            for (UnitMessages message : messagesMap.values()) {
+                log.debug("replay {} event for project:{}", 
message.getMessages().size(), message.getKey());
+                replayer.replay(message);
+            }
         }
     }
 
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
index 2ecb5041e1..63299c0aa4 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
@@ -33,6 +33,8 @@ import java.util.stream.LongStream;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.LogConstant;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.VersionConflictException;
 import org.apache.kylin.common.persistence.AuditLog;
@@ -103,7 +105,7 @@ public class AuditLogReplayWorker extends 
AbstractAuditLogReplayWorker {
             log.info("Catchup Already stopped");
             return;
         }
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
             catchupToMaxId(logOffset);
         } catch (TransactionException | DatabaseNotAvailableException e) {
             log.warn("cannot create transaction or auditlog database connect 
error, ignore it", e);
@@ -148,7 +150,9 @@ public class AuditLogReplayWorker extends 
AbstractAuditLogReplayWorker {
         }
 
         if (CollectionUtils.isEmpty(needReplayedIdList)) {
-            log.debug("needReplayedIdList is empty");
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                log.debug("needReplayedIdList is empty");
+            }
             return Lists.newArrayList();
         }
 
@@ -160,7 +164,7 @@ public class AuditLogReplayWorker extends 
AbstractAuditLogReplayWorker {
             return;
         }
 
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
             val fetchAuditLog = auditLogStore.fetch(needReplayedIdList);
             if (CollectionUtils.isEmpty(fetchAuditLog)) {
                 return;
@@ -200,7 +204,9 @@ public class AuditLogReplayWorker extends 
AbstractAuditLogReplayWorker {
                 return -1L;
             }
 
-            log.debug("start restore from {}", currentWindow);
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                log.debug("start restore from {}", currentWindow);
+            }
             val stepWin = new SlideWindow(currentWindow);
 
             while (stepWin.forwardRightStep(STEP)) {
@@ -211,7 +217,9 @@ public class AuditLogReplayWorker extends 
AbstractAuditLogReplayWorker {
                 }
                 stepWin.syncRightStep();
             }
-            log.debug("end restore from {}, delay queue:{}", currentWindow, 
delayIdQueue.size());
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                log.debug("end restore from {}, delay queue:{}", 
currentWindow, delayIdQueue.size());
+            }
             return currentWindow.getEnd();
         });
 
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
index 4e576d4e4f..b256db1fdd 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
@@ -23,8 +23,10 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.LogConstant;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.code.ErrorCodeSystem;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.persistence.InMemResourceStore;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -111,11 +113,13 @@ public class UnitOfWork {
         try {
             T ret;
 
-            if (retry != 1) {
-                log.debug("UnitOfWork {} in project {} is retrying for {}th 
time", traceId, params.getUnitName(),
-                        retry);
-            } else {
-                log.debug("UnitOfWork {} started on project {}", traceId, 
params.getUnitName());
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                if (retry != 1) {
+                    log.debug("UnitOfWork {} in project {} is retrying for 
{}th time", traceId, params.getUnitName(),
+                            retry);
+                } else {
+                    log.debug("UnitOfWork {} started on project {}", traceId, 
params.getUnitName());
+                }
             }
 
             long startTime = System.currentTimeMillis();
@@ -124,7 +128,9 @@ public class UnitOfWork {
             long startTransactionTime = System.currentTimeMillis();
             val waitForLockTime = startTransactionTime - startTime;
             if (waitForLockTime > 3000) {
-                log.warn("UnitOfWork {} takes too long time {}ms to start", 
traceId, waitForLockTime);
+                try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                    log.warn("UnitOfWork {} takes too long time {}ms to 
start", traceId, waitForLockTime);
+                }
             }
 
             ret = params.getProcessor().process();
@@ -137,7 +143,7 @@ public class UnitOfWork {
             handleError(throwable, params, retry, traceId);
         } finally {
             if (isAlreadyInTransaction()) {
-                try {
+                try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
                     val unitOfWork = UnitOfWork.get();
                     unitOfWork.getCurrentLock().unlock();
                     unitOfWork.cleanResource();
@@ -161,13 +167,15 @@ public class UnitOfWork {
     }
 
     private static void logIfLongTransaction(long duration, String traceId) {
-        if (duration > 3000) {
-            log.warn("UnitOfWork {} takes too long time {}ms to complete", 
traceId, duration);
-            if (duration > 10000) {
-                log.warn("current stack: ", new Throwable());
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            if (duration > 3000) {
+                log.warn("UnitOfWork {} takes too long time {}ms to complete", 
traceId, duration);
+                if (duration > 10000) {
+                    log.warn("current stack: ", new Throwable());
+                }
+            } else {
+                log.debug("UnitOfWork {} takes {}ms to complete", traceId, 
duration);
             }
-        } else {
-            log.debug("UnitOfWork {} takes {}ms to complete", traceId, 
duration);
         }
     }
 
@@ -178,7 +186,9 @@ public class UnitOfWork {
         val lock = params.getTempLockName() == null ? 
TransactionLock.getLock(project, readonly)
                 : TransactionLock.getLock(params.getTempLockName(), readonly);
 
-        log.trace("get lock for project {}, lock is held by current thread: 
{}", project, lock.isHeldByCurrentThread());
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            log.trace("get lock for project {}, lock is held by current 
thread: {}", project, lock.isHeldByCurrentThread());
+        }
         //re-entry is not encouraged (because it indicates complex handling 
logic, bad smell), let's abandon it first
         Preconditions.checkState(!lock.isHeldByCurrentThread());
         lock.lock();
@@ -202,7 +212,9 @@ public class UnitOfWork {
         ResourceStore.setRS(configCopy, rs);
         
unitOfWork.setLocalConfig(KylinConfig.setAndUnsetThreadLocalConfig(configCopy));
 
-        log.trace("sandbox RS {} now takes place for main RS {}", rs, 
underlying);
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            log.trace("sandbox RS {} now takes place for main RS {}", rs, 
underlying);
+        }
 
         return unitOfWork;
     }
@@ -248,14 +260,16 @@ public class UnitOfWork {
         val unitMessages = packageEvents(eventList, get().getProject(), 
traceId, writeInterceptor);
         long entitiesSize = unitMessages.getMessages().stream().filter(event 
-> event instanceof ResourceRelatedEvent)
                 .count();
-        log.debug("transaction {} updates {} metadata items", traceId, 
entitiesSize);
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            log.debug("transaction {} updates {} metadata items", traceId, 
entitiesSize);
+        }
         checkEpoch(params);
         val unitName = params.getUnitName();
         metadataStore.batchUpdate(unitMessages, 
get().getParams().isSkipAuditLog(), unitName, params.getEpochId());
         if (entitiesSize != 0 && !params.isReadonly() && 
!params.isSkipAuditLog() && !config.isUTEnv()) {
             factory.postAsync(new AuditLogBroadcastEventNotifier());
         }
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
             // replayInTransaction in leader before release lock
             val replayer = MessageSynchronization.getInstance(originConfig);
             replayer.replayInTransaction(unitMessages);
@@ -284,7 +298,9 @@ public class UnitOfWork {
         }
 
         if (retry == 1) {
-            log.warn("transaction failed at first time, traceId:" + traceId, 
throwable);
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                log.warn("transaction failed at first time, traceId:" + 
traceId, throwable);
+            }
         }
     }
 
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java
index ca476ed474..92da48642f 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.Singletons;
+import org.apache.kylin.common.constant.LogConstant;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.util.ExecutorServiceUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
 import 
org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier;
@@ -126,7 +128,9 @@ public class EventBusFactory {
     }
 
     public void postAsync(SchedulerEventNotifier event) {
-        log.debug("Post event {} async", event);
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
+            log.debug("Post event {} async", event);
+        }
         if (event instanceof BroadcastEventReadyNotifier) {
             broadcastEventBus.post(event);
         } else {
@@ -135,12 +139,16 @@ public class EventBusFactory {
     }
 
     public void postSync(Object event) {
-        log.debug("Post event {} sync", event);
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
+            log.debug("Post event {} sync", event);
+        }
         syncEventBus.post(event);
     }
 
     public void callService(Object event) {
-        log.debug("Post Service event {} sync", event);
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
+            log.debug("Post Service event {} sync", event);
+        }
         serviceEventBus.post(event);
     }
 
@@ -153,7 +161,7 @@ public class EventBusFactory {
 
     private void stopThreadPool(ExecutorService executor) {
         executor.shutdown();
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
             if (!executor.awaitTermination(6000, TimeUnit.SECONDS)) {
                 ExecutorServiceUtil.forceShutdown(executor);
             }
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
index 3f22a1c502..5a4e92e58c 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
@@ -71,10 +71,12 @@ public class DefaultExecutable extends AbstractExecutable 
implements ChainedExec
         List<Executable> executables = 
getTasks().stream().map(Executable.class::cast).collect(Collectors.toList());
         switch (getJobSchedulerMode()) {
         case DAG:
+            logger.info("Execute in DAG mode.");
             dagSchedule(executables, context);
             break;
         case CHAIN:
         default:
+            logger.info("Execute in CHAIN mode.");
             chainedSchedule(executables, context);
             break;
         }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
index f6b93383ea..74c30ab86a 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
@@ -36,7 +36,7 @@ public class TotalStorageCollector implements 
StorageInfoCollector {
     public void doCollect(KylinConfig config, String project, 
StorageVolumeInfo storageVolumeInfo) throws IOException {
         long totalStorageSize = 
hdfsCapacityMetrics.getHdfsCapacityByProject(project);
         if (totalStorageSize != -1L) {
-            log.info("Reuse workingDirCapacity by project {}, storageSize: 
{}", project, totalStorageSize);
+            log.debug("Reuse workingDirCapacity by project {}, storageSize: 
{}", project, totalStorageSize);
             storageVolumeInfo.setTotalStorageSize(totalStorageSize);
             return;
         }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
index cbe1ba9590..e8d3acc29b 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
@@ -45,6 +45,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.Singletons;
+import org.apache.kylin.common.constant.LogConstant;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.persistence.metadata.Epoch;
 import org.apache.kylin.common.persistence.metadata.EpochStore;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -72,7 +74,7 @@ import lombok.Synchronized;
 import lombok.val;
 
 public class EpochManager {
-    private static final Logger logger = 
LoggerFactory.getLogger(EpochManager.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(LogConstant.METADATA_CATEGORY);
 
     public static EpochManager getInstance() {
         return Singletons.getInstance(EpochManager.class, clz -> {
@@ -174,13 +176,17 @@ public class EpochManager {
             if (CollectionUtils.isNotEmpty(outdatedProjects)) {
                 outdatedProjects.forEach(EpochManager.this::deleteEpoch);
                 notifierEscapedProject(outdatedProjects);
-                logger.warn("remove outdated epoch list :{}", String.join(",", 
outdatedProjects));
+                try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                    logger.warn("remove outdated epoch list :{}", 
String.join(",", outdatedProjects));
+                }
             }
         }
 
         @Synchronized("renewLock")
         public void tryRenewOwnedEpochs() {
-            logger.debug("Start renew owned epoch.........");
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.debug("Start renew owned epoch.........");
+            }
             long startTime = System.currentTimeMillis();
 
             //1.check and get project
@@ -197,7 +203,9 @@ public class EpochManager {
             }
 
             if (CollectionUtils.isEmpty(oriOwnedEpochSet)) {
-                logger.info("current node own none project, end renew...");
+                try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                    logger.info("current node own none project, end renew...");
+                }
                 return;
             }
 
@@ -208,8 +216,10 @@ public class EpochManager {
             notifierAfterUpdatedEpoch("renew", lastRenewEpochSet, 
afterRenewEpochSets);
             lastRenewEpochSet.clear();
             lastRenewEpochSet.addAll(afterRenewEpochSets);
-            logger.debug("End renew owned epoch,cost:{}.........",
-                    TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() 
- startTime));
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.debug("End renew owned epoch,cost:{}.........",
+                        
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime));
+            }
         }
 
         private Set<String> innerRenewEpochWithRetry(Set<Epoch> oriEpochs) {
@@ -251,7 +261,7 @@ public class EpochManager {
             totalTask.forEach(taskEpochList -> {
                 val epochTargetList = 
taskEpochList.stream().map(Epoch::getEpochTarget).collect(Collectors.toList());
                 renewExecutor.submit(() -> {
-                    try {
+                    try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
                         if (CollectionUtils.isNotEmpty(epochTargetList)) {
                             batchRenewEpoch(taskEpochList);
                             newRenewEpochSets.addAll(epochTargetList);
@@ -264,7 +274,7 @@ public class EpochManager {
                 });
             });
 
-            try {
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
                 if (!countDownLatch.await(epochRenewTimeout, 
TimeUnit.SECONDS)) {
                     logger.error("renew not finished,{}/{}...", 
newRenewEpochSets.size(), oriEpochs.size());
                 }
@@ -276,7 +286,9 @@ public class EpochManager {
 
         @Synchronized("updateLock")
         public void tryUpdateAllEpochs() {
-            logger.debug("Start update Epochs.........");
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.debug("Start update Epochs.........");
+            }
             long startTime = System.currentTimeMillis();
 
             //1.check and get project
@@ -294,7 +306,9 @@ public class EpochManager {
             }
 
             if (CollectionUtils.isEmpty(projects)) {
-                logger.debug("don't have more new project, end update...");
+                try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                    logger.debug("don't have more new project, end update...");
+                }
                 return;
             }
 
@@ -303,8 +317,10 @@ public class EpochManager {
 
             notifierAfterUpdatedEpoch("update", Collections.emptySet(), 
updatedMewEpochs);
 
-            logger.debug("End update Epochs,cost:{}:.........",
-                    TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() 
- startTime));
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.debug("End update Epochs,cost:{}:.........",
+                        
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime));
+            }
         }
 
         private Set<String> tryUpdateEpochByProjects(final List<String> 
projects) {
@@ -345,18 +361,24 @@ public class EpochManager {
                 eventBusFactory.postAsync(new ProjectEscapedNotifier(project));
             }
 
-            logger.warn("notifier escaped project:{}", String.join(",", 
escapedProjects));
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.warn("notifier escaped project:{}", String.join(",", 
escapedProjects));
+            }
         }
 
         private void notifierAfterUpdatedEpoch(String updateTypeName, 
Set<String> oriEpochs, Set<String> newEpochs) {
-            logger.debug("after {} new epoch size:{}, Project {} owned by {}", 
updateTypeName, newEpochs.size(),
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.debug("after {} new epoch size:{}, Project {} owned by 
{}", updateTypeName, newEpochs.size(),
                     String.join(",", newEpochs), identity);
+            }
 
             if (CollectionUtils.isNotEmpty(newEpochs)) {
                 Collection<String> newControlledProjects = new 
HashSet<>(Sets.difference(newEpochs, oriEpochs));
                 if (CollectionUtils.isNotEmpty(newControlledProjects)) {
-                    logger.debug("after {} controlled projects: {}", 
updateTypeName,
+                    try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                        logger.debug("after {} controlled projects: {}", 
updateTypeName,
                             String.join(",", newControlledProjects));
+                    }
                     newControlledProjects.forEach(p -> 
eventBusFactory.postAsync(new ProjectControlledNotifier(p)));
                 }
             }
@@ -364,7 +386,9 @@ public class EpochManager {
             if (CollectionUtils.isNotEmpty(oriEpochs)) {
                 Collection<String> escapedProjects = new 
HashSet<>(Sets.difference(oriEpochs, newEpochs));
                 if (CollectionUtils.isNotEmpty(escapedProjects)) {
-                    logger.debug("after {} escaped projects: {}", 
updateTypeName, String.join(",", escapedProjects));
+                    try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                        logger.debug("after {} escaped projects: {}", 
updateTypeName, String.join(",", escapedProjects));
+                    }
                     notifierEscapedProject(escapedProjects);
                 }
             }
@@ -531,7 +555,7 @@ public class EpochManager {
             return false;
         }
         return EpochUpdateLockManager.executeEpochWithLock(epochTarget, () -> {
-            try {
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
                 Epoch epoch = epochStore.getEpoch(epochTarget);
                 Pair<Epoch, Epoch> oldNewEpochPair = oldEpoch2NewEpoch(epoch, 
epochTarget, force, null);
 
@@ -672,23 +696,25 @@ public class EpochManager {
     }
 
     private boolean isEpochLegal(Epoch epoch) {
-        if (epoch == null) {
-            logger.debug("Get null epoch");
-            return false;
-        } else if (StringUtils.isEmpty(epoch.getCurrentEpochOwner())) {
-            logger.debug("Epoch {}'s owner is empty", epoch);
-            return false;
-        } else if (System.currentTimeMillis() - epoch.getLastEpochRenewTime() 
> epochExpiredTime * 1000) {
-            logger.warn("Epoch {}'s last renew time is expired. Current time 
is {}, expiredTime is {}", epoch,
-                    System.currentTimeMillis(), epochExpiredTime);
-            return false;
-        }
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            if (epoch == null) {
+                logger.debug("Get null epoch");
+                return false;
+            } else if (StringUtils.isEmpty(epoch.getCurrentEpochOwner())) {
+                logger.debug("Epoch {}'s owner is empty", epoch);
+                return false;
+            } else if (System.currentTimeMillis() - 
epoch.getLastEpochRenewTime() > epochExpiredTime * 1000) {
+                logger.warn("Epoch {}'s last renew time is expired. Current 
time is {}, expiredTime is {}", epoch,
+                        System.currentTimeMillis(), epochExpiredTime);
+                return false;
+            }
 
-        ResourceGroupManager rgManager = 
ResourceGroupManager.getInstance(config);
-        String epochServer = getHostAndPort(epoch.getCurrentEpochOwner());
-        if 
(!rgManager.instanceHasPermissionToOwnEpochTarget(epoch.getEpochTarget(), 
epochServer)) {
-            logger.debug("Epoch {}'s owner is not in build request type 
resource group.", epoch);
-            return false;
+            ResourceGroupManager rgManager = 
ResourceGroupManager.getInstance(config);
+            String epochServer = getHostAndPort(epoch.getCurrentEpochOwner());
+            if 
(!rgManager.instanceHasPermissionToOwnEpochTarget(epoch.getEpochTarget(), 
epochServer)) {
+                logger.debug("Epoch {}'s owner is not in build request type 
resource group.", epoch);
+                return false;
+            }
         }
         return true;
     }
@@ -713,7 +739,9 @@ public class EpochManager {
         if (!isGlobalProject(epochTargetTemp)) {
             val targetProjectInstance = 
NProjectManager.getInstance(config).getProject(epochTargetTemp);
             if (Objects.isNull(targetProjectInstance)) {
-                logger.warn("get epoch failed, because the project:{} dose not 
exist", epochTargetTemp);
+                try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                    logger.warn("get epoch failed, because the project:{} dose 
not exist", epochTargetTemp);
+                }
                 return null;
             }
 
@@ -762,7 +790,9 @@ public class EpochManager {
     public void deleteEpoch(String epochTarget) {
         EpochUpdateLockManager.executeEpochWithLock(epochTarget, () -> {
             epochStore.delete(epochTarget);
-            logger.debug("delete epoch:{}", epochTarget);
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.debug("delete epoch:{}", epochTarget);
+            }
             return null;
         });
     }
@@ -790,7 +820,9 @@ public class EpochManager {
 
     private boolean checkInMaintenanceMode() {
         if (isMaintenanceMode()) {
-            logger.debug("System is currently undergoing maintenance. Abort 
updating Epochs");
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+                logger.debug("System is currently undergoing maintenance. 
Abort updating Epochs");
+            }
             return true;
         }
         return false;
@@ -802,7 +834,9 @@ public class EpochManager {
 
     // when shutdown or meta data is inconsistent
     public void releaseOwnedEpochs() {
-        logger.info("Release owned epochs");
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
+            logger.info("Release owned epochs");
+        }
         epochStore.executeWithTransaction(() -> {
             val epochs = 
epochStore.list().stream().filter(this::checkEpochOwnerOnly).collect(Collectors.toList());
             epochs.forEach(epoch -> {
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java
index 41c9d6100f..44da2716cb 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java
@@ -114,7 +114,7 @@ public class OptRecV2 {
     }
 
     public void initRecommendation() {
-        log.info("Start to initialize recommendation({}/{}}", project, 
getUuid());
+        log.debug("Start to initialize recommendation({}/{}}", project, 
getUuid());
 
         NDataModel dataModel = getModel();
         if (dataModel.isBroken()) {
@@ -133,7 +133,7 @@ public class OptRecV2 {
     }
 
     public List<RawRecItem> filterExcludedRecPatterns(List<RawRecItem> 
rawRecItems) {
-        log.info("Start to initialize recommendation patterns({}/{}}", 
project, getUuid());
+        log.debug("Start to initialize recommendation patterns({}/{}}", 
project, getUuid());
         NDataModel dataModel = getModel();
         if (dataModel.isBroken()) {
             log.warn("Discard all related recommendations for model({}/{}) is 
broken.", project, uuid);
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
index d488dfe67b..425d81697d 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
@@ -467,4 +467,26 @@ class EpochManagerTest {
         }
     }
 
+    @Test
+    void testUpdateAllEpochsSuccess() {
+        Epoch e1 = new Epoch();
+        e1.setEpochTarget("test1");
+        e1.setCurrentEpochOwner("owner1");
+        e1.setEpochId(1);
+        e1.setLastEpochRenewTime(System.currentTimeMillis());
+
+        Epoch e2 = new Epoch();
+        e2.setEpochTarget("test2");
+        e2.setCurrentEpochOwner("owner2");
+        e2.setEpochId(1);
+        e2.setLastEpochRenewTime(System.currentTimeMillis());
+
+        getEpochStore().insertBatch(Arrays.asList(e1, e2));
+
+        EpochManager epochManager = EpochManager.getInstance();
+        epochManager.tryUpdateEpoch(EpochManager.GLOBAL, false);
+        epochManager.updateAllEpochs();
+        Assertions.assertFalse(epochManager.getOwnedEpochs().isEmpty());
+    }
+
 }
diff --git 
a/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java
 
b/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java
index e33e4aafee..087555f533 100644
--- 
a/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java
+++ 
b/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java
@@ -114,7 +114,7 @@ public class MetricsInfluxdbReporter implements 
MetricsReporter {
         dailyInstance.init();
         Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() 
-> {
             try {
-                logger.info("Start to aggregate daily metrics ...");
+                logger.debug("Start to aggregate daily metrics ...");
                 long now = System.currentTimeMillis();
                 long todayStart = TimeUtil.getDayStart(now);
 
@@ -141,7 +141,7 @@ public class MetricsInfluxdbReporter implements 
MetricsReporter {
                 updateDailyMetrics(todayStart, config);
 
                 retry.set(0);
-                logger.info("Aggregate daily metrics success ...");
+                logger.debug("Aggregate daily metrics success ...");
             } catch (Exception e) {
                 retry.incrementAndGet();
                 logger.error("Failed to aggregate daily metrics, retry: {}", 
retry.get(), e);
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
index 294b6ddb75..6bcbe1cfbd 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.http.HttpStatus;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.LogConstant;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.logging.SetLogCategory;
@@ -73,7 +74,6 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class AutoRefreshSnapshotRunner implements Runnable {
     private static final String SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE = 
"Project[%s] Save View Mapping Failed";
-    private static final String SCHEDULE_LOG_CATEGORY = "schedule";
 
     private static final Map<String, AutoRefreshSnapshotRunner> INSTANCE_MAP = 
Maps.newConcurrentMap();
     @Setter
@@ -131,7 +131,7 @@ public class AutoRefreshSnapshotRunner implements Runnable {
     }
 
     public void doRun() {
-        try (SetLogCategory ignored = new 
SetLogCategory(SCHEDULE_LOG_CATEGORY)) {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
             log.info("Project[{}] start check and refresh snapshot", project);
             if (log.isDebugEnabled()) {
                 val poolExecutor = (ThreadPoolExecutor) jobPool;
@@ -346,7 +346,7 @@ public class AutoRefreshSnapshotRunner implements Runnable {
 
     @Override
     public void run() {
-        try (SetLogCategory ignored = new 
SetLogCategory(SCHEDULE_LOG_CATEGORY)) {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
             saveMarkFile();
             doRun();
         } catch (Exception e) {
@@ -357,7 +357,7 @@ public class AutoRefreshSnapshotRunner implements Runnable {
     }
 
     public void runWhenSchedulerInit() {
-        try (SetLogCategory ignored = new 
SetLogCategory(SCHEDULE_LOG_CATEGORY)) {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) {
             doRun();
         } catch (Exception e) {
             log.error(e.getMessage(), e);
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index c9305850a9..b3b902b4b6 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -54,6 +54,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.cluster.ClusterManagerFactory;
 import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.LogConstant;
 import org.apache.kylin.common.exception.ErrorCode;
 import org.apache.kylin.common.exception.ExceptionReason;
 import org.apache.kylin.common.exception.ExceptionResolve;
@@ -61,6 +62,7 @@ import org.apache.kylin.common.exception.JobErrorCode;
 import org.apache.kylin.common.exception.JobExceptionReason;
 import org.apache.kylin.common.exception.JobExceptionResolve;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.metrics.MetricsCategory;
 import org.apache.kylin.common.metrics.MetricsGroup;
 import org.apache.kylin.common.metrics.MetricsName;
@@ -156,7 +158,7 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
     @Autowired
     private ModelService modelService;
 
-    private static final Logger logger = LoggerFactory.getLogger("schedule");
+    private static final Logger logger = 
LoggerFactory.getLogger(LogConstant.BUILD_CATEGORY);
 
     private static final Map<String, String> jobTypeMap = Maps.newHashMap();
     private static final String LAST_MODIFIED = "last_modified";
@@ -615,7 +617,7 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
         // waite time in output
         Map<String, String> waiteTimeMap;
         val output = executable.getOutput();
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
             waiteTimeMap = 
JsonUtil.readValueAsMap(output.getExtra().getOrDefault(NBatchConstants.P_WAITE_TIME,
 "{}"));
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
@@ -710,7 +712,7 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
     }
 
     public void setExceptionResolveAndCodeAndReason(Output output, 
ExecutableStepResponse executableStepResponse) {
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
             val exceptionCode = getExceptionCode(output);
             
executableStepResponse.setFailedResolve(ExceptionResolve.getResolve(exceptionCode));
             
executableStepResponse.setFailedCode(ErrorCode.getLocalizedString(exceptionCode));
@@ -734,7 +736,7 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
     }
 
     public String getExceptionCode(Output output) {
-        try {
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
             var exceptionOrExceptionMessage = output.getFailedReason();
 
             if (StringUtils.isBlank(exceptionOrExceptionMessage)) {
@@ -889,7 +891,9 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
         result.setSequenceID(stageBase.getStepId());
 
         if (stageOutput == null) {
-            logger.warn("Cannot found output for task: id={}", 
stageBase.getId());
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
+                logger.warn("Cannot found output for task: id={}", 
stageBase.getId());
+            }
             return result;
         }
         for (Map.Entry<String, String> entry : 
stageOutput.getExtra().entrySet()) {
@@ -925,7 +929,9 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
         result.setSequenceID(task.getStepId());
 
         if (stepOutput == null) {
-            logger.warn("Cannot found output for task: id={}", task.getId());
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
+                logger.warn("Cannot found output for task: id={}", 
task.getId());
+            }
             return result;
         }
 
@@ -1018,7 +1024,9 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
     }
 
     public void batchUpdateGlobalJobStatus(List<String> jobIds, String action, 
List<String> filterStatuses) {
-        logger.info("Owned projects is {}", projectService.getOwnedProjects());
+        try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
+            logger.info("Owned projects is {}", 
projectService.getOwnedProjects());
+        }
         for (String project : projectService.getOwnedProjects()) {
             aclEvaluate.checkProjectOperationPermission(project);
             batchUpdateJobStatus0(jobIds, project, action, filterStatuses);
@@ -1252,7 +1260,9 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
                 project);
         FusionModel fusionModel = fusionModelManager.getFusionModel(modelId);
         if (!model.isFusionModel() || Objects.isNull(fusionModel)) {
-            logger.warn("model is not fusion model or fusion model is null, 
{}", modelId);
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
+                logger.warn("model is not fusion model or fusion model is 
null, {}", modelId);
+            }
             return;
         }
 
@@ -1370,7 +1380,9 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
     @Override
     public void onApplicationEvent(ApplicationEvent event) {
         if (event instanceof ContextClosedEvent) {
-            logger.info("Stop kyligence node, kill job on yarn for yarn 
cluster mode");
+            try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
+                logger.info("Stop kyligence node, kill job on yarn for yarn 
cluster mode");
+            }
             EpochManager epochManager = EpochManager.getInstance();
             KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             List<Epoch> ownedEpochs = epochManager.getOwnedEpochs();
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index 0de349d774..c51fc1c35e 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.service;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_ACTION_ILLEGAL;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_NOT_EXIST;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_STATUS_ILLEGAL;
+import static org.apache.kylin.job.constant.JobStatusEnum.PENDING;
 import static org.apache.kylin.job.constant.JobStatusEnum.SKIP;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
@@ -917,7 +918,7 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
         List<ExecutableStepResponse> stages2 = 
subStages.get(segmentId2).getStage();
         assertEquals(1, stages2.size());
         ExecutableStepResponse logicStepResponse2 = stages2.get(0);
-        checkResponse(logicStepResponse2, logicStep.getId(), 
JobStatusEnum.PENDING);
+        checkResponse(logicStepResponse2, logicStep.getId(), PENDING);
         assertEquals(0, logicStepResponse2.getExecStartTime());
         assertTrue(logicStepResponse2.getExecStartTime() < 
System.currentTimeMillis());
 
@@ -2034,4 +2035,13 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
                     .get(0).getOutput().getStatus(), jobStatus);
         }
     }
+
+    @Test
+    public void testParseToExecutableStepWithStepOutputNull() {
+        AbstractExecutable task = new FiveSecondSucceedTestExecutable();
+        task.setProject("default");
+        ExecutableState jobState = ExecutableState.RUNNING;
+        ExecutableStepResponse result = jobService.parseToExecutableStep(task, 
null, new HashMap<>(), jobState);
+        Assert.assertSame(PENDING, result.getStatus());
+    }
 }
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java
index 1f267173ae..61fb0132b3 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.rest.broadcaster;
 
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.kylin.common.KylinConfig;
 import 
org.apache.kylin.common.persistence.transaction.AddS3CredentialToSparkBroadcastEventNotifier;
+import 
org.apache.kylin.common.persistence.transaction.AuditLogBroadcastEventNotifier;
 import 
org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier;
 import org.apache.kylin.junit.annotation.MetadataInfo;
 import org.apache.kylin.rest.cluster.ClusterManager;
@@ -30,6 +32,7 @@ import 
org.apache.kylin.rest.config.initialize.BroadcastListener;
 import org.apache.kylin.rest.security.AclPermission;
 import org.apache.kylin.rest.security.AdminUserSyncEventNotifier;
 import org.apache.kylin.rest.security.UserAclManager;
+import org.apache.kylin.rest.service.AuditLogService;
 import org.apache.kylin.rest.service.UserAclService;
 import org.apache.kylin.rest.service.UserService;
 import org.apache.kylin.rest.util.SpringContext;
@@ -51,6 +54,8 @@ import org.apache.kylin.metadata.epoch.EpochManager;
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
+import static 
org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier.BroadcastScopeEnum.WHOLE_NODES;
+
 @Slf4j
 @MetadataInfo(onlyProps = true)
 class BroadcasterTest {
@@ -77,6 +82,22 @@ class BroadcasterTest {
         }
     }
 
+    @Test
+    void testBroadcastWithAnnounceContains() {
+        try (ConfigurableApplicationContext context = 
this.application.run("--kylin.server.mode=all")) {
+            SpringContext springContext = context.getBean(SpringContext.class);
+            ReflectionTestUtils.setField(springContext, "applicationContext", 
context);
+            Broadcaster broadcaster = context.getBean(Broadcaster.class);
+
+            BroadcastEventReadyNotifier eventReadyNotifier = new 
BroadcastEventReadyNotifier();
+            broadcaster.announce(eventReadyNotifier);
+            // announce twice
+            broadcaster.announce(eventReadyNotifier);
+
+            Assertions.assertSame(WHOLE_NODES, 
eventReadyNotifier.getBroadcastScope());
+        }
+    }
+
     @Test
     void testBroadcastSyncAdminUserAcl() throws Exception {
         EpochManager epochManager = EpochManager.getInstance();
@@ -97,6 +118,20 @@ class BroadcasterTest {
         assert 
SparderEnv.getSparkSession().conf().contains(String.format("fs.s3a.bucket.%s.assumed.role.arn",
 "aa"));
     }
 
+    @Test
+    void testBroadcastWithAuditLog() {
+        BroadcastListener broadcastListener = new BroadcastListener();
+        val auditLogService = Mockito.spy(AuditLogService.class);
+        ReflectionTestUtils.setField(broadcastListener, "auditLogService", 
auditLogService);
+        String errorMsg = "";
+        try {
+            broadcastListener.handle(new AuditLogBroadcastEventNotifier());
+        } catch (IOException e) {
+            errorMsg = e.getMessage();
+        }
+        Assertions.assertTrue(errorMsg.isEmpty());
+    }
+
     @Configuration
     static class Config {
         @Bean
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
index bdbce8d893..5f68795339 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
@@ -162,7 +162,7 @@ public class QueryCacheManager implements 
CommonQueryCacheSupporter {
 
     public SQLResponse doSearchQuery(QueryCacheManager.Type type, SQLRequest 
sqlRequest) {
         Object response = kylinCache.get(type.rootCacheName, 
sqlRequest.getProject(), sqlRequest.getCacheKey());
-        logger.info("[query cache log] The cache key is: {}", 
sqlRequest.getCacheKey());
+        logger.debug("[query cache log] The cache key is: {}", 
sqlRequest.getCacheKey());
         if (response == null) {
             return null;
         }
@@ -178,7 +178,7 @@ public class QueryCacheManager implements 
CommonQueryCacheSupporter {
 
         // check signature for success query resp in case the datasource is 
changed
         if (QueryCacheSignatureUtil.checkCacheExpired(cached, 
sqlRequest.getProject())) {
-            logger.info("[query cache log] cache has expired, cache key is 
{}", sqlRequest.getCacheKey());
+            logger.debug("[query cache log] cache has expired, cache key is 
{}", sqlRequest.getCacheKey());
             clearQueryCache(sqlRequest);
             return null;
         }
@@ -315,10 +315,10 @@ public class QueryCacheManager implements 
CommonQueryCacheSupporter {
 
     public void clearProjectCache(String project) {
         if (project == null) {
-            logger.debug("[query cache log] clear query cache for all 
projects.");
+            logger.info("[query cache log] clear query cache for all 
projects.");
             kylinCache.clearAll();
         } else {
-            logger.debug("[query cache log] clear query cache for {}", 
project);
+            logger.info("[query cache log] clear query cache for {}", project);
             kylinCache.clearByType(Type.SUCCESS_QUERY_CACHE.rootCacheName, 
project);
             kylinCache.clearByType(Type.EXCEPTION_QUERY_CACHE.rootCacheName, 
project);
             kylinCache.clearByType(Type.SCHEMA_CACHE.rootCacheName, project);
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index dcc26d6a51..04dad61a83 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -62,6 +62,7 @@ import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryTrace;
+import org.apache.kylin.common.constant.LogConstant;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.KylinTimeoutException;
@@ -183,7 +184,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
 
     public static final String QUERY_STORE_PATH_PREFIX = "/query/";
     private static final String JDBC_METADATA_SCHEMA = "metadata";
-    private static final Logger logger = LoggerFactory.getLogger("query");
+    private static final Logger logger = 
LoggerFactory.getLogger(LogConstant.QUERY_CATEGORY);
     final SlowQueryDetector slowQueryDetector = new SlowQueryDetector();
 
     @Autowired
@@ -298,6 +299,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
 
             if (QueryContext.current().getQueryTagInfo().isAsyncQuery()
                     && 
NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue())
 {
+                logger.info("This query is an async query in project: {}", 
sqlRequest.getProject());
                 if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) {
                     queryParams.setSparkQueue(sqlRequest.getSparkQueue());
                 }
@@ -479,7 +481,8 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             
queryContext.setQueryId(UUID.fromString(sqlRequest.getQueryId()).toString());
         }
         try (SetThreadName ignored = new SetThreadName("Query %s", 
queryContext.getQueryId());
-                SetLogCategory ignored2 = new SetLogCategory("query")) {
+                SetLogCategory ignored2 = new 
SetLogCategory(LogConstant.QUERY_CATEGORY)) {
+            logger.info("Start query in project: {}", sqlRequest.getProject());
             if (sqlRequest.getExecuteAs() != null)
                 sqlRequest.setUsername(sqlRequest.getExecuteAs());
             else
@@ -606,6 +609,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             
QueryUtils.updateQueryContextSQLMetrics(rawSql.getStatementString());
             
QueryContext.currentTrace().amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB, 
System.currentTimeMillis());
             QueryContext.currentTrace().endLastSpan();
+            QueryContext.current().record("update_metrics_time");
             
QueryContext.currentMetrics().setQueryEndTime(System.currentTimeMillis());
 
             sqlResponse.setServer(clusterManager.getLocalServer());
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
index 91cc8670bf..c9543e11e0 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -321,7 +321,7 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
                 Database db = 
SparderEnv.getSparkSession().catalog().getDatabase(database);
             } catch (AnalysisException e) {
                 UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
-                logger.info("The current user: {} does not have permission to 
access database {}", ugi.getUserName(),
+                logger.error("The current user: {} does not have permission to 
access database {}", ugi.getUserName(),
                         database);
                 return false;
             }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
index 29d38f641b..931570c6c9 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
@@ -18,12 +18,10 @@
 
 package org.apache.kylin.engine.spark.builder
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Locale, Objects, Timer, TimerTask}
-
+import com.google.common.collect.Sets
 import org.apache.commons.lang3.StringUtils
 import org.apache.kylin.common.util.HadoopUtil
-import org.apache.kylin.common.{KapConfig, KylinConfig}
+import org.apache.kylin.common.{CustomUtils, KapConfig, KylinConfig}
 import org.apache.kylin.engine.spark.builder.DFBuilderHelper._
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
 import org.apache.kylin.engine.spark.job.{FiltersUtil, TableMetaManager}
@@ -38,6 +36,12 @@ import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.utils.ProxyThreadUtils
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Locale, Objects, Timer, TimerTask}
+
+import org.apache.kylin.common.constant.LogConstant
+import org.apache.kylin.common.logging.SetLogCategory
+import org.apache.spark.util.Utils
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -46,8 +50,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS}
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success, Try}
 
-import com.google.common.collect.Sets
-
 class SegmentFlatTable(private val sparkSession: SparkSession, //
                        private val tableDesc: SegmentFlatTableDesc) extends 
LogEx {
 
@@ -524,7 +526,9 @@ object SegmentFlatTable extends LogEx {
     val newFields = originDS.schema.fields.map(f =>
       convertFromDot("`" + alias + "`" + "." + "`" + f.name + "`")).toSeq
     val newDS = originDS.toDF(newFields: _*)
-    logInfo(s"Wrap ALIAS ${originDS.schema.treeString} TO 
${newDS.schema.treeString}")
+    CustomUtils.tryWithResourceIgnore(new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
+      _ => logInfo(s"Wrap ALIAS ${originDS.schema.treeString} TO 
${newDS.schema.treeString}")
+    }
     newDS
   }
 
@@ -557,7 +561,9 @@ object SegmentFlatTable extends LogEx {
       val equiConditionColPairs = fk.zip(pk).map(joinKey =>
         col(convertFromDot(joinKey._1.getBackTickIdentity))
           .equalTo(col(convertFromDot(joinKey._2.getBackTickIdentity))))
-      logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}")
+      CustomUtils.tryWithResourceIgnore(new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
+        _ => logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}")
+      }
 
       if (join.getNonEquiJoinCondition != null) {
         var condition = 
NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
index 269065599b..a2c272dec6 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
@@ -80,11 +80,10 @@ class RDSegmentBuildExec(private val jobContext: 
RDSegmentBuildJob, //
       val paths = 
ResourceDetectUtils.getPaths(execution.sparkPlan).map(_.toString).asJava
       logInfo(s"Detected source: $sourceName $leaves 
${paths.asScala.mkString(",")}")
       val startTime = System.currentTimeMillis()
-      logInfo(s"Detect source size start time is $startTime")
-      val resourceSize = 
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),config.isConcurrencyFetchDataSourceSize,
+      val resourceSize = 
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), 
config.isConcurrencyFetchDataSourceSize,
         paths.asScala.map(path => new Path(path)): _*)
       val endTime = System.currentTimeMillis()
-      logInfo(s"Detect source size end time is $endTime")
+      logInfo(s"Detect source size cost time is ${endTime - startTime} ms.")
 
       logInfo(s"Detect source size $resourceSize")
       sourceSize.put(sourceName, resourceSize)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
index b9663c8d26..6bbf5217c7 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
@@ -95,7 +95,9 @@ public class SegmentBuildJob extends SegmentJob {
     @Override
     protected final void doExecute() throws Exception {
 
+        log.info("Start sub stage {}" + REFRESH_SNAPSHOTS.name());
         REFRESH_SNAPSHOTS.create(this, null, null).toWork();
+        log.info("End sub stage {}" + REFRESH_SNAPSHOTS.name());
 
         buildContext = new BuildContext(getSparkSession().sparkContext(), 
config);
         buildContext.appStatusTracker().startMonitorBuildResourceState();
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala
index 70a01b35e4..91b2850c83 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala
@@ -19,13 +19,15 @@
 package org.apache.kylin.engine.spark.job.exec
 
 import org.apache.kylin.engine.spark.job.stage.StageExec
-
 import java.io.IOException
 import java.util
 import java.util.Locale
+
+import org.apache.spark.internal.Logging
+
 import scala.collection.JavaConverters._
 
-class BuildExec(var id: String) {
+class BuildExec(var id: String) extends Logging{
   protected var subStages = new util.ArrayList[StageExec]
 
   def getId: String = {
@@ -35,7 +37,9 @@ class BuildExec(var id: String) {
   @throws(classOf[IOException])
   def buildSegment(): Unit = {
     for (stage <- subStages.asScala) {
+      logInfo(s"Start sub stage ${stage.getStageName}")
       stage.toWork()
+      logInfo(s"End sub stage ${stage.getStageName}")
     }
   }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala
index 498ef117d4..2b4bd57c94 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala
@@ -29,7 +29,9 @@ class MergeExec(id: String) extends BuildExec(id) {
   @throws(classOf[IOException])
   def mergeSegment(): Unit = {
     for (stage <- subStages.asScala) {
+      logInfo(s"Start sub stage ${stage.getStageName}")
       stage.toWork()
+      logInfo(s"End sub stage ${stage.getStageName}")
     }
   }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala
index 4394a8615d..86dd500178 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala
@@ -29,7 +29,9 @@ class SnapshotExec(id: String) extends BuildExec(id) {
   @throws(classOf[IOException])
   def buildSnapshot(): Unit = {
     for (stage <- subStages.asScala) {
+      logInfo(s"Start sub stage ${stage.getStageName}")
       stage.toWorkWithoutFinally()
+      logInfo(s"End sub stage ${stage.getStageName}")
     }
   }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala
index d95607398d..aa0bb0d780 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala
@@ -27,7 +27,9 @@ class TableAnalyzerExec(id: String) extends BuildExec(id) {
 
   def analyzerTable(): Unit = {
     for (stage <- subStages.asScala) {
+      logInfo(s"Start sub stage ${stage.getStageName}")
       stage.toWorkWithoutFinally()
+      logInfo(s"End sub stage ${stage.getStageName}")
     }
   }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
index 50de84001b..7f99feff77 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
@@ -32,6 +32,8 @@ import java.util
 trait StageExec extends Logging {
   protected var id: String = _
 
+  def getStageName: String
+
   def getJobContext: SparkApplication
 
   def getDataSegment: NDataSegment
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala
index c7111bd852..fd05f59b8a 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala
@@ -80,4 +80,6 @@ class WaiteForResource(jobContext: SparkApplication) extends 
StageExec {
       KylinBuildEnv.get().buildJobInfos.endWait()
     }
   }
+
+  override def getStageName: String = "WaiteForResource"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala
index 84f8454bae..ca4aaa2998 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala
@@ -31,4 +31,6 @@ class BuildDict(jobContext: SegmentJob, dataSegment: 
NDataSegment, buildParam: B
     val dict: Dataset[Row] = buildDictIfNeed()
     buildParam.setDict(dict)
   }
+
+  override def getStageName: String = "BuildDict"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
index 734a4361e2..a8cf9dc237 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
@@ -31,4 +31,6 @@ class BuildLayer(jobContext: SegmentJob, dataSegment: 
NDataSegment, buildParam:
     // Drain results immediately after building.
     drain()
   }
+
+  override def getStageName: String = "BuildLayer"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala
index 08ba41e46d..cef881284c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala
@@ -50,4 +50,6 @@ class CostBasedPlanner(jobContext: SegmentJob, dataSegment: 
NDataSegment, buildP
       buildParam.setFlatTableDesc(flatTableDesc)
     }
   }
+
+  override def getStageName: String = "CostBasedPlanner"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index 38a191d461..24391d28a7 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -48,6 +48,10 @@ import org.apache.spark.utils.ProxyThreadUtils
 import java.math.BigInteger
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Locale, Objects, Timer, TimerTask}
+
+import org.apache.kylin.common.constant.LogConstant
+import org.apache.kylin.common.logging.SetLogCategory
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.parallel.ForkJoinTaskSupport
@@ -607,6 +611,7 @@ abstract class FlatTableAndDictBase(private val jobContext: 
SegmentJob,
 
   private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = {
     if (config.isV2DictEnable) {
+      logInfo("Build v2 dict default.")
       var matchedCols = selectColumnsInTable(ds, dictCols)
       if (dataSegment.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) {
         matchedCols = matchedCols.filterNot(_.getType.isIntegerFamily)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
index 920e1b7bf6..6668593a42 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
@@ -48,4 +48,6 @@ class GatherFlatTableStats(jobContext: SegmentJob, 
dataSegment: NDataSegment, bu
     // Cleanup previous potentially left temp layout data.
     cleanupLayoutTempData(dataSegment, readOnlyLayouts.asScala.toSeq)
   }
+
+  override def getStageName: String = "GatherFlatTableStats"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
index f3b304a57d..ba62f0bfd9 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
@@ -38,4 +38,6 @@ class GenerateFlatTable(jobContext: SegmentJob, dataSegment: 
NDataSegment, build
       onStageSkipped()
     }
   }
+
+  override def getStageName: String = "GenerateFlatTable"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
index f467058559..632ae0cd1c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
@@ -54,4 +54,6 @@ class MaterializedFactTableView(jobContext: SegmentJob, 
dataSegment: NDataSegmen
       onStageSkipped()
     }
   }
+
+  override def getStageName: String = "MaterializedFactTableView"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala
index 328097df3e..48b63f122b 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala
@@ -35,4 +35,6 @@ class RefreshColumnBytes(jobContext: SegmentJob, dataSegment: 
NDataSegment, buil
     cleanup()
     logInfo(s"Finished SEGMENT $segmentId")
   }
+
+  override def getStageName: String = "RefreshColumnBytes"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
index 4cfc8711b2..ef6518546b 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
@@ -44,4 +44,6 @@ class RefreshSnapshots(jobContext: SegmentJob) extends 
StageExec {
       case _ =>
     }
   }
+
+  override def getStageName: String = "RefreshSnapshots"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala
index ab0d070b52..23e834f085 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala
@@ -30,4 +30,6 @@ class PartitionBuildDict(jobContext: SegmentJob, dataSegment: 
NDataSegment, buil
     val dict: Dataset[Row] = buildDictIfNeed()
     buildParam.setDict(dict)
   }
+
+  override def getStageName: String = "PartitionBuildDict"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
index 7a43766be0..995bb588cc 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
@@ -30,4 +30,6 @@ class PartitionBuildLayer(jobContext: SegmentJob, 
dataSegment: NDataSegment, bui
     // Drain results immediately after building.
     drain()
   }
+
+  override def getStageName: String = "PartitionBuildLayer"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala
index 3aac410594..7b3c1bf586 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala
@@ -55,4 +55,6 @@ class PartitionCostBasedPlanner(jobContext: SegmentJob, 
dataSegment: NDataSegmen
       buildParam.setTableDesc(tableDesc)
     }
   }
+
+  override def getStageName: String = "PartitionCostBasedPlanner"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala
index 43afbb100e..103c02c32b 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala
@@ -75,4 +75,5 @@ class PartitionGatherFlatTableStats(jobContext: SegmentJob, 
dataSegment: NDataSe
     }
   }
 
+  override def getStageName: String = "PartitionGatherFlatTableStats"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala
index b42b2a3956..bf406ac72e 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala
@@ -37,4 +37,6 @@ class PartitionGenerateFlatTable(jobContext: SegmentJob, 
dataSegment: NDataSegme
       onStageSkipped()
     }
   }
+
+  override def getStageName: String = "PartitionGenerateFlatTable"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
index e8f2fca99f..deb1c604bb 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
@@ -61,4 +61,6 @@ class PartitionMaterializedFactTableView(jobContext: 
SegmentJob, dataSegment: ND
       onStageSkipped()
     }
   }
+
+  override def getStageName: String = "PartitionMaterializedFactTableView"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala
index c8e26660e2..86ef005167 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala
@@ -34,4 +34,6 @@ class PartitionRefreshColumnBytes(jobContext: SegmentJob, 
dataSegment: NDataSegm
     cleanup()
     logInfo(s"Finished SEGMENT $segmentId")
   }
+
+  override def getStageName: String = "PartitionRefreshColumnBytes"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
index 3091e92e0a..1aa160e8f7 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
@@ -29,4 +29,6 @@ class MergeColumnBytes(jobContext: SegmentJob, dataSegment: 
NDataSegment)
 
     cleanup()
   }
+
+  override def getStageName: String = "MergeColumnBytes"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala
index 58c5edbcbf..f525cfec82 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala
@@ -29,4 +29,6 @@ class MergeFlatTable(jobContext: SegmentJob, dataSegment: 
NDataSegment)
 
     mergeFlatTable()
   }
+
+  override def getStageName: String = "MergeFlatTable"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala
index 66956fafe6..9812131e51 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala
@@ -29,4 +29,6 @@ class MergeIndices(jobContext: SegmentJob, dataSegment: 
NDataSegment)
     // Drain results immediately after merging.
     drain()
   }
+
+  override def getStageName: String = "MergeIndices"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala
index 4f52f70781..2de190be1d 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala
@@ -29,4 +29,6 @@ class PartitionMergeColumnBytes(jobContext: SegmentJob, 
dataSegment: NDataSegmen
 
     cleanup()
   }
+
+  override def getStageName: String = "PartitionMergeColumnBytes"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala
index 8fb98af061..5ca128dceb 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala
@@ -29,4 +29,6 @@ class PartitionMergeFlatTable(jobContext: SegmentJob, 
dataSegment: NDataSegment)
 
     mergeFlatTable()
   }
+
+  override def getStageName: String = "PartitionMergeFlatTable"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala
index f18f264aed..d51ff5c7ac 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala
@@ -29,4 +29,6 @@ class PartitionMergeIndices(jobContext: SegmentJob, 
dataSegment: NDataSegment)
     // Drain results immediately after merging.
     drain()
   }
+
+  override def getStageName: String = "PartitionMergeIndices"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala
index 77f8bb5bb9..de0d8dd1cc 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala
@@ -33,4 +33,6 @@ class SnapshotsBuild(jobContext: SnapshotBuildJob) extends 
StageExec {
   override def execute(): Unit = {
     jobContext.buildSnapshot()
   }
+
+  override def getStageName: String = "SnapshotsBuild"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala
index e12e44344a..f215383531 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala
@@ -33,4 +33,6 @@ class AnalyzerTable(jobContext: TableAnalyzerJob) extends 
StageExec {
   override def execute(): Unit = {
     jobContext.analyzerTable()
   }
+
+  override def getStageName: String = "AnalyzerTable"
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResourceTest.scala
similarity index 62%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala
copy to 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResourceTest.scala
index 77f8bb5bb9..7d06e5dfc0 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResourceTest.scala
@@ -16,21 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.engine.spark.job.stage.snapshots
+package org.apache.kylin.engine.spark.job.stage
 
 import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.job.SnapshotBuildJob
-import org.apache.kylin.engine.spark.job.stage.StageExec
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.junit.Assert
+import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
 
-class SnapshotsBuild(jobContext: SnapshotBuildJob) extends StageExec {
-  override def getJobContext: SparkApplication = jobContext
+class WaiteForResourceTest extends AnyFunSuite{
 
-  override def getDataSegment: NDataSegment = null
+  test("test WaiteForResource getStageName") {
+    val sparkApplication = Mockito.mock(classOf[SparkApplication])
 
-  override def getSegmentId: String = jobContext.getJobId
-
-  override def execute(): Unit = {
-    jobContext.buildSnapshot()
+    val waiteForResource = new WaiteForResource(sparkApplication)
+    Assert.assertEquals("WaiteForResource", waiteForResource.getStageName)
   }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytesTest.scala
similarity index 52%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
copy to 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytesTest.scala
index 920e1b7bf6..d6f0b87c1a 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytesTest.scala
@@ -21,31 +21,26 @@ package org.apache.kylin.engine.spark.job.stage.build
 import org.apache.kylin.engine.spark.job.SegmentJob
 import org.apache.kylin.engine.spark.job.stage.BuildParam
 import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.metadata.model.NDataModel
+import org.junit.Assert
+import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
 
-import scala.collection.JavaConverters._
+import com.google.common.collect.ImmutableBiMap
 
-class GatherFlatTableStats(jobContext: SegmentJob, dataSegment: NDataSegment, 
buildParam: BuildParam)
-  extends BuildStage(jobContext, dataSegment, buildParam) {
+class RefreshColumnBytesTest extends AnyFunSuite {
 
-  override def execute(): Unit = {
-    scheduleCheckpoint()
+  test("test RefreshColumnBytes getStageName") {
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    val dataSegment = Mockito.mock(classOf[NDataSegment])
+    val buildParam = Mockito.mock(classOf[BuildParam])
 
-    // Build flat table?
-    if (buildParam.getSpanningTree.fromFlatTable()) {
-      // Collect statistics for flat table.
-      val statistics = buildStatistics()
-      buildParam.setFlatTableStatistics(statistics)
+    val dataModel = Mockito.mock(classOf[NDataModel])
+    Mockito.when(dataSegment.getModel).thenReturn(dataModel)
+    val builder: ImmutableBiMap.Builder[Integer, NDataModel.Measure] = 
ImmutableBiMap.builder();
+    
Mockito.when(dataSegment.getModel.getEffectiveMeasures).thenReturn(builder.build())
 
-      // Build inferior flat table.
-      if (config.isInferiorFlatTableEnabled) {
-        buildInferior()
-      }
-    }
-
-    // Build root node's layout sanity cache.
-    buildSanityCache()
-
-    // Cleanup previous potentially left temp layout data.
-    cleanupLayoutTempData(dataSegment, readOnlyLayouts.asScala.toSeq)
+    val mergeColumnBytes = new RefreshColumnBytes(segmentJob, dataSegment, 
buildParam)
+    Assert.assertEquals("RefreshColumnBytes", mergeColumnBytes.getStageName)
   }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
similarity index 69%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
copy to 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
index 734a4361e2..82931d40ca 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
@@ -19,16 +19,16 @@
 package org.apache.kylin.engine.spark.job.stage.build
 
 import org.apache.kylin.engine.spark.job.SegmentJob
-import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.junit.Assert
+import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
 
-class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, 
buildParam: BuildParam)
-  extends BuildStage(jobContext, dataSegment, buildParam) {
+class RefreshSnapshotsTest extends AnyFunSuite {
 
-  override def execute(): Unit = {
-    // Build layers.
-    buildLayouts()
-    // Drain results immediately after building.
-    drain()
+  test("test RefreshSnapshots getStageName") {
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+
+    val refreshSnapshots = new RefreshSnapshots(segmentJob)
+    Assert.assertEquals("RefreshSnapshots", refreshSnapshots.getStageName)
   }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytesTest.scala
similarity index 51%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala
copy to 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytesTest.scala
index b42b2a3956..ee95311a9d 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytesTest.scala
@@ -21,20 +21,26 @@ package 
org.apache.kylin.engine.spark.job.stage.build.partition
 import org.apache.kylin.engine.spark.job.SegmentJob
 import org.apache.kylin.engine.spark.job.stage.BuildParam
 import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.kylin.metadata.model.NDataModel
+import org.junit.Assert
+import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
 
-class PartitionGenerateFlatTable(jobContext: SegmentJob, dataSegment: 
NDataSegment, buildParam: BuildParam)
-  extends PartitionFlatTableAndDictBase(jobContext, dataSegment, buildParam) {
-  override def execute(): Unit = {
-    val flatTable: Dataset[Row] = generateFlatTable()
-    buildParam.setFlatTable(flatTable)
-    val flatTablePart: Dataset[Row] = generateFlatTablePart()
-    buildParam.setFlatTablePart(flatTablePart)
+import com.google.common.collect.ImmutableBiMap
 
-    buildParam.setPartitionFlatTable(this)
+class PartitionRefreshColumnBytesTest extends AnyFunSuite {
 
-    if (buildParam.isSkipGenerateFlatTable) {
-      onStageSkipped()
-    }
+  test("test PartitionRefreshColumnBytes getStageName") {
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    val dataSegment = Mockito.mock(classOf[NDataSegment])
+    val buildParam = Mockito.mock(classOf[BuildParam])
+
+    val dataModel = Mockito.mock(classOf[NDataModel])
+    Mockito.when(dataSegment.getModel).thenReturn(dataModel)
+    val builder: ImmutableBiMap.Builder[Integer, NDataModel.Measure] = 
ImmutableBiMap.builder();
+    
Mockito.when(dataSegment.getModel.getEffectiveMeasures).thenReturn(builder.build())
+
+    val partitionRefreshColumnBytes = new 
PartitionRefreshColumnBytes(segmentJob, dataSegment, buildParam)
+    Assert.assertEquals("PartitionRefreshColumnBytes", 
partitionRefreshColumnBytes.getStageName)
   }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytesTest.scala
similarity index 61%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
copy to 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytesTest.scala
index 3091e92e0a..28c0b0ccba 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytesTest.scala
@@ -20,13 +20,21 @@ package org.apache.kylin.engine.spark.job.stage.merge
 
 import org.apache.kylin.engine.spark.job.SegmentJob
 import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.metadata.model.NDataModel
+import org.junit.Assert
+import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
 
-class MergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment)
-  extends MergeStage(jobContext, dataSegment) {
+class MergeColumnBytesTest extends AnyFunSuite {
 
-  override def execute(): Unit = {
-    mergeColumnBytes()
+  test("test MergeColumnBytes getStageName") {
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    val dataSegment = Mockito.mock(classOf[NDataSegment])
+    val dataModel = Mockito.mock(classOf[NDataModel])
 
-    cleanup()
+    Mockito.when(dataSegment.getModel).thenReturn(dataModel)
+
+    val mergeColumnBytes = new MergeColumnBytes(segmentJob, dataSegment)
+    Assert.assertEquals("MergeColumnBytes", mergeColumnBytes.getStageName)
   }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala
index 2e23a6f8c7..88b15ec3ee 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala
@@ -51,6 +51,8 @@ class MergeStageTest extends AnyFunSuite with LocalMetadata {
     override def execute(): Unit = {}
 
     override def getUnmergedFTPaths: Seq[Path] = super.getUnmergedFTPaths
+
+    override def getStageName: String = "MergeStageMock"
   }
 
   def testGetUnmergedFTPaths(config: KylinConfig): Unit = {
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytesTest.scala
similarity index 59%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala
copy to 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytesTest.scala
index 4f52f70781..cfaf90a59e 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytesTest.scala
@@ -20,13 +20,21 @@ package 
org.apache.kylin.engine.spark.job.stage.merge.partition
 
 import org.apache.kylin.engine.spark.job.SegmentJob
 import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.metadata.model.NDataModel
+import org.junit.Assert
+import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
 
-class PartitionMergeColumnBytes(jobContext: SegmentJob, dataSegment: 
NDataSegment)
-  extends PartitionMergeStage(jobContext, dataSegment) {
+class PartitionMergeColumnBytesTest extends AnyFunSuite {
 
-  override def execute(): Unit = {
-    mergeColumnBytes()
+  test("test PartitionMergeColumnBytes getStageName") {
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    val dataSegment = Mockito.mock(classOf[NDataSegment])
+    val dataModel = Mockito.mock(classOf[NDataModel])
 
-    cleanup()
+    Mockito.when(dataSegment.getModel).thenReturn(dataModel)
+
+    val partitionMergeColumnBytes = new PartitionMergeColumnBytes(segmentJob, 
dataSegment)
+    Assert.assertEquals("PartitionMergeColumnBytes", 
partitionMergeColumnBytes.getStageName)
   }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala
index 59bf9108b2..641f9ca70c 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala
@@ -56,6 +56,12 @@ class TestResourceUtils extends SparderBaseFunSuite with 
BeforeAndAfterEach {
 
   // test case: available(10, 10)  executor(20, 10) driver(1, 1)
   test("checkResource return false when available memory does not meet 
acquirement") {
+    // Without this may cause NPE
+    KylinBuildEnv.clean()
+    val config: KylinConfig = Mockito.mock(classOf[KylinConfig])
+    Mockito.when(config.getMaxAllocationResourceProportion).thenReturn(0.9)
+    KylinBuildEnv.getOrCreate(config)
+    
Mockito.when(fetcher.fetchMaximumResourceAllocation).thenReturn(ResourceInfo(Integer.MAX_VALUE,
 Integer.MAX_VALUE))
     val conf = new SparkConf()
     conf.set(EXECUTOR_INSTANCES, "5")
     conf.set(EXECUTOR_MEMORY, "2MB")
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
index cd74a7cd33..490000d745 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
@@ -66,7 +66,7 @@ public class AsyncProfilerUtils {
         if (!cachedResult.await(resultCollectionTimeout, 
TimeUnit.MILLISECONDS)) {
             logger.warn("timeout while waiting for profile result");
         }
-        logger.debug("profiler stopped and result dumped to $localCacheDir");
+        logger.debug("profiler stopped and result dumped to {}", 
localCacheDir);
         ZipFileUtils.compressZipFile(localCacheDir.getAbsolutePath(), 
outStream);
     }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/common/CustomUtils.scala
similarity index 69%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
copy to 
src/spark-project/spark-common/src/main/scala/org/apache/kylin/common/CustomUtils.scala
index 3091e92e0a..b60fca9912 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/common/CustomUtils.scala
@@ -16,17 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.engine.spark.job.stage.merge
+package org.apache.kylin.common
 
-import org.apache.kylin.engine.spark.job.SegmentJob
-import org.apache.kylin.metadata.cube.model.NDataSegment
+object CustomUtils {
 
-class MergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment)
-  extends MergeStage(jobContext, dataSegment) {
-
-  override def execute(): Unit = {
-    mergeColumnBytes()
-
-    cleanup()
+  // Somehow equivalent to Java's try with resource, handle cannot be null
+  def tryWithResourceIgnore[T <: AutoCloseable](handle: T)(func: T => Any): 
Unit = {
+    try {
+      func(handle)
+    } finally {
+      handle.close()
+    }
   }
 }
diff --git a/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java
index 54736982ed..612e6dc7ce 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java
@@ -93,7 +93,7 @@ public class KylinLogTool {
     private static final String SYSTEM_PROPERTIES = "System Properties";
 
     private static final Set<String> kylinLogPrefix = 
Sets.newHashSet("kylin.log", "kylin.schedule.log",
-            "kylin.query.log", "kylin.smart.log", "kylin.build.log", 
"kylin.security.log");
+            "kylin.query.log", "kylin.smart.log", "kylin.build.log", 
"kylin.security.log", "kylin.metadata.log");
 
     private static final Set<String> queryDiagExcludedLogs = 
Sets.newHashSet("kylin.log", "kylin.schedule.log",
             "kylin.smart.log", "kylin.build.log", "kylin.security.log");

Reply via email to